You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC
svn commit: r1625344 [3/4] - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/
itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/
llap-client/src/java/org/ llap-client/src/java/org/apache/ ...
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Vector;
+
+/**
+ * A feedback interface for chunk producer-consumer pipeline.
+ */
+public interface ChunkProducerFeedback {
+ void returnCompleteVector(Vector vector);
+ void stop();
+ // We may add throttling here later.
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.processor;
+
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.loader.Loader;
+import org.apache.hadoop.hive.llap.loader.OrcLoader;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// TODO: write unit tests if this class becomes less primitive.
+public class Pool {
+ // TODO: for now, pool is of dubious value. There's one processor per request.
+ // So, this provides thread safety that may or may not be needed.
+ private final LinkedList<Processor> processors = new LinkedList<Processor>();
+ private static final int POOL_LIMIT = 10;
+ // There's only one loader, assumed to be thread safe.
+ private final Loader loader;
+ private final ExecutorService threadPool;
+
+ public Pool(Loader loader, Configuration conf) {
+ this.loader = loader;
+ int threadCount = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_REQUEST_THREAD_COUNT);
+ this.threadPool = Executors.newFixedThreadPool(threadCount,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Llap thread %d").build());
+ }
+
+ public void enqueue(RequestImpl request, ChunkConsumer consumer) {
+ Processor proc = null;
+ synchronized (processors) {
+ proc = processors.poll();
+ }
+ if (proc == null) {
+ proc = new Processor(this, loader);
+ }
+ proc.setRequest(request, consumer);
+ threadPool.submit(proc);
+ }
+
+ void returnProcessor(Processor proc) {
+ synchronized (processors) {
+ if (processors.size() < POOL_LIMIT) {
+ processors.add(proc);
+ }
+ }
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.loader.Loader;
+
+/**
+ * Request processor class. Currently, of dubious value.
+ */
+public class Processor implements Runnable {
+ private final Pool parent;
+ private final Loader loader;
+ private RequestImpl request;
+ private ChunkConsumer consumer;
+
+ public Processor(Pool pool, Loader loader) {
+ this.parent = pool;
+ this.loader = loader;
+ }
+
+ public void setRequest(RequestImpl request, ChunkConsumer consumer) {
+ this.request = request;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ loader.load(request, consumer); // Synchronous load call that return results via consumer.
+ } catch (Throwable t) {
+ Llap.LOG.error("Load failed", t);
+ consumer.setError(t);
+ }
+ parent.returnProcessor(this);
+ }
+}
Added: hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLrfuCachePolicy {
+ private static final Log LOG = LogFactory.getLog(TestLrfuCachePolicy.class);
+
+ @Test
+ public void testHeapSize1() {
+ testHeapSize(1);
+ }
+
+ @Test
+ public void testHeapSize2() {
+ testHeapSize(2);
+ }
+
+ @Test
+ public void testHeapSize7() {
+ testHeapSize(7);
+ }
+
+ @Test
+ public void testHeapSize8() {
+ testHeapSize(8);
+ }
+
+ @Test
+ public void testHeapSize30() {
+ testHeapSize(30);
+ }
+
+ @Test
+ public void testLfuExtreme() {
+ int heapSize = 4;
+ LOG.info("Testing lambda 0 (LFU)");
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+ LrfuCachePolicy lfu = new LrfuCachePolicy(conf, 1, heapSize);
+ for (int i = 0; i < heapSize; ++i) {
+ WeakBuffer buffer = BufferPool.allocateFake();
+ assertNull(cache(lfu, buffer));
+ inserted.add(buffer);
+ }
+ Collections.shuffle(inserted, rdm);
+ // LFU extreme, order of accesses should be ignored, only frequency matters.
+ // We touch first elements later, but do it less times, so they will be evicted first.
+ for (int i = inserted.size() - 1; i >= 0; --i) {
+ for (int j = 0; j < i + 1; ++j) {
+ lfu.notifyLock(inserted.get(i));
+ lfu.notifyUnlock(inserted.get(i));
+ }
+ }
+ verifyOrder(lfu, inserted);
+ }
+
+ @Test
+ public void testLruExtreme() {
+ int heapSize = 4;
+ LOG.info("Testing lambda 1 (LRU)");
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+ LrfuCachePolicy lru = new LrfuCachePolicy(conf, 1, heapSize);
+ for (int i = 0; i < heapSize; ++i) {
+ WeakBuffer buffer = BufferPool.allocateFake();
+ assertNull(cache(lru, buffer));
+ inserted.add(buffer);
+ }
+ Collections.shuffle(inserted, rdm);
+ // LRU extreme, frequency of accesses should be ignored, only order matters.
+ for (int i = 0; i < inserted.size(); ++i) {
+ for (int j = 0; j < (inserted.size() - i); ++j) {
+ lru.notifyLock(inserted.get(i));
+ lru.notifyUnlock(inserted.get(i));
+ }
+ }
+ verifyOrder(lru, inserted);
+ }
+
+ @Test
+ public void testDeadlockResolution() {
+ int heapSize = 4;
+ LOG.info("Testing deadlock resolution");
+ ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+ LrfuCachePolicy lrfu = new LrfuCachePolicy(new HiveConf(), 1, heapSize);
+ for (int i = 0; i < heapSize; ++i) {
+ WeakBuffer buffer = BufferPool.allocateFake();
+ assertNull(cache(lrfu, buffer));
+ inserted.add(buffer);
+ }
+ // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
+ WeakBuffer locked = inserted.get(0);
+ lock(lrfu, locked);
+ WeakBuffer evicted = lrfu.evictOneMoreBlock();
+ assertNotNull(evicted);
+ assertTrue(evicted.isInvalid());
+ assertNotSame(locked, evicted);
+ unlock(lrfu, locked);
+ }
+
+ // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
+ public WeakBuffer cache(LrfuCachePolicy lrfu, WeakBuffer buffer) {
+ buffer.lock(false);
+ WeakBuffer result = lrfu.cache(buffer);
+ buffer.unlock();
+ if (result != CachePolicy.CANNOT_EVICT) {
+ lrfu.notifyUnlock(buffer);
+ }
+ return result;
+ }
+
+ private static void lock(LrfuCachePolicy lrfu, WeakBuffer locked) {
+ locked.lock(false);
+ lrfu.notifyLock(locked);
+ }
+
+ private static void unlock(LrfuCachePolicy lrfu, WeakBuffer locked) {
+ locked.unlock();
+ lrfu.notifyUnlock(locked);
+ }
+
+ private void testHeapSize(int heapSize) {
+ LOG.info("Testing heap size " + heapSize);
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.01f);
+ LrfuCachePolicy lrfu = new LrfuCachePolicy(conf, 1, heapSize);
+ // Insert the number of elements plus 2, to trigger 2 evictions.
+ int toEvict = 2;
+ ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+ WeakBuffer[] evicted = new WeakBuffer[toEvict];
+ Assume.assumeTrue(toEvict <= heapSize);
+ for (int i = 0; i < heapSize + toEvict; ++i) {
+ WeakBuffer buffer = BufferPool.allocateFake();
+ WeakBuffer evictedBuf = cache(lrfu, buffer);
+ if (i < toEvict) {
+ evicted[i] = buffer;
+ } else {
+ if (i >= heapSize) {
+ assertSame(evicted[i - heapSize], evictedBuf);
+ assertTrue(evictedBuf.isInvalid());
+ } else {
+ assertNull(evictedBuf);
+ }
+ inserted.add(buffer);
+ }
+ }
+ LOG.info("Inserted " + dumpInserted(inserted));
+ // We will touch all blocks in random order.
+ Collections.shuffle(inserted, rdm);
+ LOG.info("Touch order " + dumpInserted(inserted));
+ // Lock entire heap; heap is still full; we should not be able to evict or insert.
+ for (WeakBuffer buf : inserted) {
+ lock(lrfu, buf);
+ }
+ WeakBuffer block = lrfu.evictOneMoreBlock();
+ assertNull("Got " + block, block);
+ assertSame(CachePolicy.CANNOT_EVICT, cache(lrfu, BufferPool.allocateFake()));
+ for (WeakBuffer buf : inserted) {
+ unlock(lrfu, buf);
+ }
+ // To make (almost) sure we get definite order, touch blocks in order large number of times.
+ for (WeakBuffer buf : inserted) {
+ // TODO: this seems to indicate that priorities change too little...
+ // perhaps we need to adjust the policy.
+ for (int j = 0; j < 10; ++j) {
+ lrfu.notifyLock(buf);
+ lrfu.notifyUnlock(buf);
+ }
+ }
+ verifyOrder(lrfu, inserted);
+ }
+
+ private void verifyOrder(LrfuCachePolicy lrfu, ArrayList<WeakBuffer> inserted) {
+ WeakBuffer block;
+ // Evict all blocks.
+ for (int i = 0; i < inserted.size(); ++i) {
+ block = lrfu.evictOneMoreBlock();
+ assertTrue(block.isInvalid());
+ assertSame(inserted.get(i), block);
+ }
+ // The map should now be empty.
+ assertNull(lrfu.evictOneMoreBlock());
+ }
+
+ private String dumpInserted(ArrayList<WeakBuffer> inserted) {
+ String debugStr = "";
+ for (int i = 0; i < inserted.size(); ++i) {
+ if (i != 0) debugStr += ", ";
+ debugStr += inserted.get(i);
+ }
+ return debugStr;
+ }
+}
Added: hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java (added)
+++ hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
+import org.apache.hadoop.hive.llap.loader.BufferInProgress;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestChunkReaderAndWriter {
+ private static final Log LOG = LogFactory.getLog(TestChunkReaderAndWriter.class);
+ private static final int LARGE_BUFFER = 1024 * 1024;
+
+ @Test
+ public void testSimpleValues() throws Exception {
+ testValues(0.0);
+ }
+
+ @Test
+ public void testValuesWithNulls() throws Exception {
+ testValues(0.02);
+ testValues(0.5);
+ testValues(0.98);
+ }
+
+ @Test
+ public void testNullSegmentTransitions() throws Exception {
+ Random random = new Random(1234);
+ ChunkWriterImpl writer = new ChunkWriterImpl();
+ BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+ writer.prepare(buf);
+ int ncount1 = 1, ncount2 = 3, ncount3 = 60, ncount4 = 3;
+ Long[] src = new Long[ncount1 + ncount2 + ncount3 + ncount4 + 1];
+ int offset = 0;
+
+ // Try various combinations of repeating and non-repeating segments.
+ offset = writeNulls(writer, ncount1, src, offset);
+ long[] tmp = new long[1];
+ tmp[0] = random.nextLong();
+ src[offset] = tmp[0];
+ writer.writeLongs(tmp, 0, 1, NullsState.NEXT_NULL);
+ ++offset;
+ offset = writeNulls(writer, ncount2, src, offset);
+ offset = writeNulls(writer, ncount3, src, offset);
+ offset = writeNulls(writer, ncount4, src, offset);
+
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ @Test
+ public void testRepeatingSegmentTransitions() throws Exception {
+ Random random = new Random(1234);
+ ChunkWriterImpl writer = new ChunkWriterImpl();
+ BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+ writer.prepare(buf);
+ int repeating1 = 10, rcount1 = 5, repeating2 = -10, rcount2 = 5, repeating3 = 4, rcount3 = 15;
+ int nrcount1 = 30, nrcount2 = 30, nrcount3 = 30;
+ long[] src = new long[rcount1 + rcount2 + rcount3 + nrcount1 + nrcount2 + nrcount3];
+ int offset = 0;
+
+ // Try various combinations of repeating and non-repeating segments.
+ offset = writeRepeatedValues(writer, repeating1, rcount1, src, offset);
+ offset = writeLongs(random, writer, nrcount1, src, offset);
+ offset = writeRepeatedValues(writer, repeating2, rcount2, src, offset);
+ offset = writeLongs(random, writer, nrcount2, src, offset);
+ offset = writeRepeatedValues(writer, repeating3, rcount3, src, offset);
+ offset = writeLongs(random, writer, nrcount3, src, offset);
+
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ @Test
+ public void testBufferBoundary() throws Exception {
+ Random random = new Random(1234);
+ ChunkWriterImpl writer = new ChunkWriterImpl();
+ BufferInProgress buf = new BufferInProgress(createBuffer(128 * 8));
+ writer.prepare(buf);
+ long[] tmp = new long[122];
+ Long[] src = new Long[tmp.length + 2];
+ src[0] = tmp[0] = 3;
+ // This should start a segment with bitmasks and use up 4 * 8 bytes total.
+ // Plus, 8 bytes are already used up for the chunk header.
+ writer.writeLongs(tmp, 0, 1, NullsState.NEXT_NULL);
+ src[1] = null;
+ writer.writeNulls(1, true);
+ // Now we have 123 * 8 bytes; 122 values would not fit w/bitmask, but they will w/o one.
+ for (int i = 0; i < tmp.length; ++i) {
+ src[i + 2] = tmp[i] = random.nextLong();
+ }
+ writer.writeLongs(tmp, 0, tmp.length, NullsState.NEXT_NULL);
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ @Test
+ public void testBoundaryValues() throws Exception {
+ ChunkWriterImpl writer = new ChunkWriterImpl();
+ BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+ writer.prepare(buf);
+ long[] src = new long[9];
+ Arrays.fill(src, 0l);
+ src[2] = Long.MIN_VALUE;
+ src[6] = Long.MAX_VALUE;
+ src[8] = Integer.MIN_VALUE;
+ writer.writeLongs(src, 0, src.length, NullsState.NEXT_NULL);
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ /**
+ * A method for generic non-scenario tests.
+ * @param nullFraction Percentage of nulls in values.
+ */
+ private void testValues(double nullFraction) throws Exception {
+ Random random = new Random(1234);
+ ChunkWriterImpl writer = new ChunkWriterImpl();
+ WeakBuffer wb = createBuffer(LARGE_BUFFER);
+ // Value counts to test
+ int[] valueCounts = new int[] { 1, 3, 11, 64, 65, 2048 };
+ int totalCount = 0;
+ for (int valueCount : valueCounts) {
+ LOG.info("Testing for value count " + valueCount);
+ BufferInProgress buf = new BufferInProgress(wb);
+ writer.prepare(buf);
+ totalCount += valueCount;
+ Long[] src = new Long[valueCount];
+ writeLongs(writer, random, nullFraction, valueCount, src, 0);
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ // Then try all together
+ BufferInProgress buf = new BufferInProgress(wb);
+ writer.prepare(buf);
+ Long[] src = new Long[totalCount];
+ int offset = 0;
+ LOG.info("Testing for total count " + totalCount);
+ for (int valueCount : valueCounts) {
+ writeLongs(writer, random, nullFraction, valueCount, src, offset);
+ offset += valueCount;
+ }
+ writer.finishCurrentSegment();
+ verifyValues(completeChunk(writer, buf), src);
+ }
+
+ private static void writeLongs(ChunkWriterImpl writer,
+ Random rdm, double nullFraction, int count, Long[] src, int srcOffset) {
+ NullsState nullState = nullFraction == 0 ? NullsState.NO_NULLS : NullsState.NEXT_NULL;
+ long[] srcPart = new long[count];
+ int offset = 0;
+ boolean isFirst = true, isNull = true;
+ while (offset < count) {
+ int runLength = 0;
+ if (!isFirst) {
+ isNull = !isNull;
+ if (!isNull) {
+ srcPart[0] = rdm.nextLong();
+ }
+ src[srcOffset + offset] = isNull ? null : srcPart[0];
+ runLength = 1;
+ ++offset;
+ }
+ while (offset < count) {
+ boolean curIsNull = (rdm.nextDouble() <= nullFraction);
+ if (!isFirst && curIsNull != isNull) break;
+ isNull = curIsNull;
+ if (!isNull) {
+ srcPart[runLength] = rdm.nextLong();
+ }
+ src[srcOffset + offset] = isNull ? null : srcPart[runLength];
+ ++runLength;
+ ++offset;
+ isFirst = false;
+ }
+ if (isNull) {
+ LOG.info("Writing " + runLength + " nulls");
+ writer.writeNulls(runLength, true);
+ } else {
+ LOG.info("Writing " + runLength + " values");
+ writer.writeLongs(srcPart, 0, runLength, nullState);
+ }
+ }
+ }
+
+ private static int writeLongs(
+ Random random, ChunkWriterImpl writer, int count, long[] src, int offset) {
+ for (int i = 0; i < count; ++i) {
+ src[offset + i] = random.nextLong();
+ }
+ writer.writeLongs(src, offset, count, NullsState.NO_NULLS);
+ return offset + count;
+ }
+
+ private static int writeRepeatedValues(
+ ChunkWriterImpl writer, int val, int count, long[] src, int offset) {
+ for (int i = 0; i < count; ++i) {
+ src[offset + i] = val;
+ }
+ writer.writeRepeatedLongs(val, count, NullsState.NO_NULLS);
+ return offset + count;
+ }
+
+ private static int writeNulls(
+ ChunkWriterImpl writer, int count, Long[] src, int offset) {
+ for (int i = 0; i < count; ++i) {
+ src[offset + i] = null;
+ }
+ writer.writeNulls(count, false);
+ return offset + count;
+ }
+
+ private static void verifyValues(Chunk chunk, Long[] src) throws Exception {
+ verifyValues(chunk, src, src.length);
+ }
+
+ private static void verifyValues(Chunk chunk, long[] src) throws Exception {
+ verifyValues(chunk, src, src.length);
+ }
+
+ private static void verifyValues(Chunk chunk, Object src, int srcLength) throws Exception {
+ boolean nullable = src instanceof Long[];
+ Long[] src0 = nullable ? (Long[])src : null;
+ long[] src1 = nullable ? null : (long[])src;
+ int[] stepsToVerify = new int[] { srcLength, srcLength / 2, 63, 5, 1 };
+ long[] dest = new long[srcLength];
+ boolean[] isNull = new boolean[srcLength];
+ int lastStep = -1;
+ for (int step : stepsToVerify) {
+ if (step > srcLength || step == lastStep || step == 0) continue;
+ if ((srcLength / step) > 50) continue; // too slow
+ LOG.info("Verifying value count " + srcLength + " w/step " + step);
+ Arrays.fill(dest, -1);
+ int offset = 0;
+ ChunkReader reader = new ChunkReader(Type.LONG, chunk);
+ while (offset < srcLength) {
+ int adjStep = Math.min(srcLength - offset, step);
+ try {
+ reader.next(adjStep);
+ reader.copyLongs(dest, isNull, offset);
+ } catch (Exception ex) {
+ LOG.error("srcLength " + srcLength + ", step "
+ + adjStep + "/" + step + " offset " + offset, ex);
+ throw ex;
+ }
+ offset += adjStep;
+ }
+ if (nullable) {
+ verifyArrays(src0, dest, isNull);
+ } else {
+ verifyArrays(src1, dest, isNull);
+ }
+ lastStep = step;
+ }
+ }
+
+ private static void verifyArrays(Long[] src, long[] dest, boolean[] isNull) {
+ for (int i = 0; i < src.length; ++i) {
+ boolean curIsNull = (src[i] == null);
+ assertTrue(i + ": " + src[i] + " got " + dest[i] + "/" + isNull[i], curIsNull == isNull[i]);
+ if (!curIsNull) {
+ assertEquals(i + ": " + src[i] + " got " + dest[i], src[i].longValue(), dest[i]);
+ }
+ }
+ }
+
+ private static void verifyArrays(long[] src, long[] dest, boolean[] isNull) {
+ for (int i = 0; i < src.length; ++i) {
+ assertEquals(i + ": " + src[i] + " got " + dest[i], src[i], dest[i]);
+ assertFalse(i + ": unexpected null", isNull[i]);
+ }
+ }
+
+ private static Chunk completeChunk(ChunkWriterImpl writer, BufferInProgress buf) {
+ int rows = buf.getChunkInProgressRows();
+ Chunk c = buf.extractChunk();
+ writer.finishChunk(c, rows);
+ return c;
+ }
+
+ private static WeakBuffer createBuffer(int bufferSize) {
+ HiveConf hc = new HiveConf();
+ hc.setInt(HiveConf.ConfVars.LLAP_BUFFER_SIZE.varname, bufferSize);
+ BufferPool bp = new BufferPool(hc, null);
+ try {
+ return bp.allocateBuffer();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Modified: hive/branches/llap/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/pom.xml?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/pom.xml (original)
+++ hive/branches/llap/pom.xml Tue Sep 16 17:50:02 2014
@@ -42,6 +42,8 @@
<module>hwi</module>
<module>jdbc</module>
<module>metastore</module>
+ <module>llap-client</module>
+ <module>llap</module>
<module>odbc</module>
<module>ql</module>
<module>serde</module>
Modified: hive/branches/llap/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/pom.xml?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/pom.xml (original)
+++ hive/branches/llap/ql/pom.xml Tue Sep 16 17:50:02 2014
@@ -57,6 +57,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<version>${project.version}</version>
</dependency>
@@ -566,6 +571,8 @@
<include>org.apache.hive:hive-common</include>
<include>org.apache.hive:hive-exec</include>
<include>org.apache.hive:hive-serde</include>
+ <include>org.apache.hive:hive-llap-client</include>
+ <include>org.apache.hive:hive-llap</include>
<include>com.esotericsoftware.kryo:kryo</include>
<include>com.twitter:parquet-hadoop-bundle</include>
<include>org.apache.thrift:libthrift</include>
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Sep 16 17:50:02 2014
@@ -225,6 +225,7 @@ public class FetchOperator implements Se
@SuppressWarnings("unchecked")
static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass,
Configuration conf) throws IOException {
+ // TODO: why is this copy-pasted from HiveInputFormat?
if (!inputFormats.containsKey(inputFormatClass)) {
try {
InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
@@ -235,7 +236,7 @@ public class FetchOperator implements Se
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return inputFormats.get(inputFormatClass);
+ return HiveInputFormat.wrapForLlap(inputFormats.get(inputFormatClass), conf);
}
private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Tue Sep 16 17:50:02 2014
@@ -239,6 +239,7 @@ public class StatsNoJobTask extends Task
boolean statsAvailable = false;
for(FileStatus file: fileList) {
if (!file.isDir()) {
+ // TODO: do we need to wrap for Llap here? probably later when stats are cached?
InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) ReflectionUtils.newInstance(
table.getInputFormatClass(), jc);
InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { table
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Sep 16 17:50:02 2014
@@ -213,6 +213,7 @@ public class RowContainer<ROW extends Li
JobConf localJc = getLocalFSJobConfClone(jc);
if (inputSplits == null) {
if (this.inputFormat == null) {
+ // TODO: do we need to wrap here?
inputFormat = ReflectionUtils.newInstance(
tblDesc.getInputFileFormatClass(), localJc);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Tue Sep 16 17:50:02 2014
@@ -132,6 +132,8 @@ public class TezProcessor extends Abstra
rproc = new MapRecordProcessor(jobConf);
MRInputLegacy mrInput = getMRInput(inputs);
try {
+ // TODO: This might create oldInputFormat in MRInput.
+ // We are assuming we don't need to wrap it for Llap.
mrInput.init();
} catch (IOException e) {
throw new RuntimeException("Failed while initializing MRInput", e);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Tue Sep 16 17:50:02 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.NullWritable;
@@ -311,6 +312,14 @@ public class BytesColumnVector extends C
setRef(0, value, 0, value.length);
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = null;
+ isNull[0] = true;
+ }
+
@Override
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
BytesColumnVector in = (BytesColumnVector) inputVector;
@@ -321,4 +330,9 @@ public class BytesColumnVector extends C
public void init() {
initBuffer(0);
}
+
+ @Override
+ public void visit(ColumnVectorVisitor v) throws IOException {
+ v.visit(this);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java Tue Sep 16 17:50:02 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.Writable;
@@ -156,5 +157,7 @@ public abstract class ColumnVector {
public void init() {
// Do nothing by default
}
+
+ public abstract void visit(ColumnVectorVisitor v) throws IOException;
}
Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+
+/**
+ * I can has dynamic dispatch?
+ */
+public interface ColumnVectorVisitor {
+ void visit(LongColumnVector v) throws IOException;
+ void visit(DoubleColumnVector v) throws IOException;
+ void visit(DecimalColumnVector v) throws IOException;
+ void visit(BytesColumnVector v) throws IOException;
+}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
+
import org.apache.hadoop.hive.common.type.Decimal128;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -71,6 +73,21 @@ public class DecimalColumnVector extends
}
}
+ // Fill the all the vector entries with provided value
+ public void fill(Decimal128 value) {
+ noNulls = true;
+ isRepeating = true;
+ vector[0] = value;
+ }
+
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = null;
+ isNull[0] = true;
+ }
+
@Override
public void flatten(boolean selectedInUse, int[] sel, int size) {
// TODO Auto-generated method stub
@@ -96,4 +113,9 @@ public class DecimalColumnVector extends
isNull[i] = true;
}
}
+
+ @Override
+ public void visit(ColumnVectorVisitor v) throws IOException {
+ v.visit(this);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -121,6 +122,14 @@ public class DoubleColumnVector extends
vector[0] = value;
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = NULL_VALUE;
+ isNull[0] = true;
+ }
+
// Simplify vector by brute-force flattening noNulls and isRepeating
// This can be used to reduce combinatorial explosion of code paths in VectorExpressions
// with many arguments.
@@ -146,4 +155,9 @@ public class DoubleColumnVector extends
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum];
}
+
+ @Override
+ public void visit(ColumnVectorVisitor v) throws IOException {
+ v.visit(this);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.LongWritable;
@@ -165,6 +166,14 @@ public class LongColumnVector extends Co
vector[0] = value;
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = NULL_VALUE;
+ isNull[0] = true;
+ }
+
// Simplify vector by brute-force flattening noNulls and isRepeating
// This can be used to reduce combinatorial explosion of code paths in VectorExpressions
// with many arguments.
@@ -190,4 +199,9 @@ public class LongColumnVector extends Co
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum];
}
+
+ @Override
+ public void visit(ColumnVectorVisitor v) throws IOException {
+ v.visit(this);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Sep 16 17:50:02 2014
@@ -268,8 +268,7 @@ public class VectorizedRowBatchCtx {
* @return VectorizedRowBatch
* @throws HiveException
*/
- public VectorizedRowBatch createVectorizedRowBatch() throws HiveException
- {
+ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException {
List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
for (int j = 0; j < fieldRefs.size(); j++) {
@@ -394,8 +393,7 @@ public class VectorizedRowBatchCtx {
* @param batch
* @throws HiveException
*/
- public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException
- {
+ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException {
int colIndex;
Object value;
PrimitiveCategory pCategory;
@@ -640,4 +638,21 @@ public class VectorizedRowBatchCtx {
}
return assigners;
}
+
+ public int[] getIncludedColumnIndexes() {
+ int colCount = (colsToInclude == null)
+ ? rowOI.getAllStructFieldRefs().size() : colsToInclude.size();
+ int[] result = new int[colCount];
+ if (colsToInclude == null) {
+ for (int i = 0; i < result.length; ++i) {
+ result[i] = i;
+ }
+ } else {
+ int i = -1;
+ for (int colIx : colsToInclude) {
+ result[++i] = colIx;
+ }
+ }
+ return result;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions;
import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -54,6 +56,31 @@ public class NullUtil {
}
}
+ /**
+ * Set the data value for all NULL entries to the designated NULL_VALUE.
+ */
+ public static void setNullDataEntriesBytes(
+ BytesColumnVector v, boolean selectedInUse, int[] sel, int n) {
+ if (v.noNulls) {
+ return;
+ } else if (v.isRepeating && v.isNull[0]) {
+ v.vector[0] = null;
+ } else if (selectedInUse) {
+ for (int j = 0; j != n; j++) {
+ int i = sel[j];
+ if(v.isNull[i]) {
+ v.vector[i] = null;
+ }
+ }
+ } else {
+ for (int i = 0; i != n; i++) {
+ if(v.isNull[i]) {
+ v.vector[i] = null;
+ }
+ }
+ }
+ }
+
// for use by Column-Scalar and Scalar-Column arithmetic for null propagation
public static void setNullOutputEntriesColScalar(
ColumnVector v, boolean selectedInUse, int[] sel, int n) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Sep 16 17:50:02 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.Pa
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -194,6 +196,30 @@ public class HiveInputFormat<K extends W
this.job = job;
}
+ public static InputFormat<WritableComparable, Writable> wrapForLlap(
+ InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+ if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_ENABLED)) return inputFormat;
+ boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface,
+ isVector = Utilities.isVectorMode(conf);
+ if (!isSupported || !isVector) {
+ LOG.info("Not using llap for " + inputFormat + ": " + isSupported + ", " + isVector);
+ return inputFormat;
+ }
+ LOG.info("Wrapping " + inputFormat);
+ // TODO: we'd actually need a more specific template parameter for non-vectorized one...
+ // no idea how this is going to work at this point.
+ InputFormat<NullWritable, Writable> inputToWrap = castInputFormat(inputFormat);
+ return castInputFormat(new LlapInputFormat<Writable>(inputToWrap, conf));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, U, V> InputFormat<T, V> castInputFormat(InputFormat<U, V> from) {
+ // We assume that LlapWrappableInputFormatInterface has NullWritable as first parameter.
+ // Since we are using Java and not, say, a programming language, there's no way to check.
+ return (InputFormat<T, V>)from;
+ }
+
+
public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
Class inputFormatClass, JobConf job) throws IOException {
@@ -207,7 +233,7 @@ public class HiveInputFormat<K extends W
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return inputFormats.get(inputFormatClass);
+ return wrapForLlap(inputFormats.get(inputFormatClass), job);
}
public RecordReader getRecordReader(InputSplit split, JobConf job,
Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.api.Reader;
+import org.apache.hadoop.hive.llap.api.RequestFactory;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.ColumnReader;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVectorVisitor;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.NullUtil;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+public class LlapInputFormat<T>
+ implements InputFormat<NullWritable, T>, VectorizedInputFormatInterface {
+ /** See RequestFactory class documentation on why this is necessary */
+ private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.api.Llap";
+
+ private final InputFormat<NullWritable, T> realInputFormat;
+ private final RequestFactory reqFactory;
+ private static final Log LOG = LogFactory.getLog(LlapInputFormat.class);
+
+ public LlapInputFormat(InputFormat<NullWritable, T> realInputFormat, Configuration conf) {
+ this.realInputFormat = realInputFormat;
+ try {
+ reqFactory = (RequestFactory)ReflectionUtils.newInstance(Class.forName(IMPL_CLASS), conf);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to initialize impl", e);
+ }
+ }
+
+ @Override
+ public RecordReader<NullWritable, T> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ boolean isVectorMode = Utilities.isVectorMode(job);
+ if (!isVectorMode) {
+ LOG.error("No llap in non-vectorized mode; falling back to original");
+ throw new UnsupportedOperationException("No llap in non-vectorized mode");
+ // return realInputFormat.getRecordReader(split, job, reporter);
+ }
+ FileSplit fileSplit = (FileSplit)split; // should work
+ reporter.setStatus(fileSplit.toString());
+ try {
+ List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
+ ? null : ColumnProjectionUtils.getReadColumnIDs(job);
+ if (includedCols.isEmpty()) {
+ includedCols = null; // Also means read all columns? WTF?
+ }
+ Reader reader = reqFactory.createLocalRequest()
+ .setSplit(split)
+ .setSarg(SearchArgumentFactory.createFromConf(job))
+ .setColumns(includedCols).submit();
+ // TODO: presumably, we'll also pass the means to create original RecordReader
+ // for failover to LlapRecordReader somewhere around here. This will
+ // actually be quite complex because we'd somehow have to track what parts
+ // of file Llap has already returned, and skip these in fallback reader.
+ // We are actually returning a completely wrong thing here wrt template parameters.
+ // This is how vectorization does it presently; we hope the caller knows what it is doing.
+ return createRecordReaderUnsafe(job, fileSplit, reader);
+ } catch (Exception ex) {
+ LOG.error("Local request failed; falling back to original", ex);
+ throw new IOException(ex); // just rethrow for now, for clarity
+ // return realInputFormat.getRecordReader(split, job, reporter);
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private RecordReader<NullWritable, T> createRecordReaderUnsafe(
+ JobConf job, FileSplit fileSplit, Reader reader) {
+ return (RecordReader)new VectorizedLlapRecordReader(reader, job, fileSplit);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return realInputFormat.getSplits(job, numSplits);
+ }
+
+ private static class VectorizedLlapRecordReader
+ implements RecordReader<NullWritable, VectorizedRowBatch> {
+ private final Reader reader;
+ private Vector currentVector;
+ private ColumnReader currentVectorSlice; // see VectorImpl, really just currentVector
+ private int currentVectorOffset = -1; // number of rows read from current vector
+
+ private VectorizedRowBatchCtx rbCtx;
+ private final VrbHelper vrbHelper = new VrbHelper();
+ private boolean addPartitionCols = true;
+
+ public VectorizedLlapRecordReader(Reader reader, JobConf job, FileSplit split) {
+ this.reader = reader;
+ try {
+ rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.init(job, split);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+ try {
+ assert value != null;
+ if (currentVector == null) {
+ currentVector = reader.next();
+ if (currentVector == null) {
+ return false;
+ }
+ currentVectorOffset = 0;
+ }
+ // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+ if (addPartitionCols) {
+ rbCtx.addPartitionColsToBatch(value);
+ addPartitionCols = false;
+ }
+ populateBatchFromVector(value);
+ traceLogFirstRow(value);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ return true;
+ }
+
+ private void traceLogFirstRow(VectorizedRowBatch value) {
+ if (!LOG.isTraceEnabled()) return;
+ String tmp = "First row is [";
+ for (ColumnVector v : value.cols) {
+ if (v instanceof LongColumnVector) {
+ tmp += ((LongColumnVector)v).vector[0] + ", ";
+ } else if (v instanceof DoubleColumnVector) {
+ tmp += ((DoubleColumnVector)v).vector[0] + ", ";
+ } else if (v == null) {
+ tmp += "null, ";
+ } else {
+ tmp += "(something), ";
+ }
+ }
+ LOG.trace(tmp + "]");
+ }
+
+ private void populateBatchFromVector(VectorizedRowBatch target) throws IOException {
+ // TODO: eventually, when vectorized pipeline can work directly
+ // on vectors instead of VRB, this will be a noop.
+ // TODO: track time spent building VRBs as opposed to processing.
+ int rowCount = VectorizedRowBatch.DEFAULT_SIZE,
+ rowsRemaining = currentVector.getNumberOfRows() - currentVectorOffset;
+ if (rowsRemaining <= rowCount) {
+ rowCount = rowsRemaining;
+ }
+ int[] columnMap = rbCtx.getIncludedColumnIndexes();
+ if (columnMap.length != currentVector.getNumberOfColumns()) {
+ throw new RuntimeException("Unexpected number of columns, VRB has " + columnMap.length
+ + " included, but vector has " + currentVector.getNumberOfColumns());
+ }
+ vrbHelper.prepare(rowCount);
+ // VRB was created from VrbCtx, so we already have pre-allocated column vectors
+ for (int vectorIx = 0; vectorIx < currentVector.getNumberOfColumns(); ++vectorIx) {
+ int colIx = columnMap[vectorIx];
+ currentVectorSlice = currentVector.next(colIx, rowCount);
+ target.cols[colIx].visit(vrbHelper);
+ }
+ target.selectedInUse = false;
+ target.size = rowCount;
+
+ if (rowsRemaining == rowCount) {
+ currentVector = null;
+ currentVectorOffset = -1;
+ } else {
+ currentVectorOffset += rowCount;
+ }
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ try {
+ return rbCtx.createVectorizedRowBatch();
+ } catch (HiveException e) {
+ throw new RuntimeException("Error creating a batch", e);
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ // TODO: plumb progress info thru the reader if we can get metadata from loader first.
+ return 0.0f;
+ }
+
+ /**
+ * There's one VrbHelper object per RecordReader object. We could just implement
+ * the visitor in RecordReader itself, but it's a bit cleaner like this.
+ */
+ private final class VrbHelper implements ColumnVectorVisitor {
+ private int rowCount = -1;
+ public void prepare(int rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ @Override
+ public void visit(LongColumnVector c) throws IOException {
+ boolean hasNulls = currentVectorSlice.hasNulls();
+ boolean isSameValue = currentVectorSlice.isSameValue();
+ if (isSameValue) {
+ if (hasNulls) {
+ c.fillWithNulls();
+ } else {
+ c.fill(currentVectorSlice.getLong());
+ }
+ } else {
+ c.reset();
+ c.noNulls = !hasNulls;
+ currentVectorSlice.copyLongs(c.vector, hasNulls ? c.isNull : null, 0);
+ if (hasNulls) {
+ NullUtil.setNullDataEntriesLong(c, false, null, rowCount);
+ }
+ }
+ }
+
+
+ @Override
+ public void visit(DoubleColumnVector c) throws IOException {
+ boolean hasNulls = currentVectorSlice.hasNulls();
+ boolean isSameValue = currentVectorSlice.isSameValue();
+ if (isSameValue) {
+ if (hasNulls) {
+ c.fillWithNulls();
+ } else {
+ c.fill(currentVectorSlice.getDouble());
+ }
+ } else {
+ c.reset();
+ c.noNulls = !hasNulls;
+ currentVectorSlice.copyDoubles(c.vector, hasNulls ? c.isNull : null, 0);
+ if (hasNulls) {
+ NullUtil.setNullDataEntriesDouble(c, false, null, rowCount);
+ }
+ }
+ }
+
+ @Override
+ public void visit(DecimalColumnVector c) throws IOException {
+ boolean hasNulls = currentVectorSlice.hasNulls();
+ boolean isSameValue = currentVectorSlice.isSameValue();
+ if (isSameValue) {
+ if (hasNulls) {
+ c.fillWithNulls();
+ } else {
+ c.fill(currentVectorSlice.getDecimal());
+ }
+ } else {
+ c.reset();
+ c.noNulls = !hasNulls;
+ currentVectorSlice.copyDecimals(c.vector, hasNulls ? c.isNull : null, 0);
+ if (hasNulls) {
+ NullUtil.setNullDataEntriesDecimal(c, false, null, rowCount);
+ }
+ }
+ }
+
+ @Override
+ public void visit(BytesColumnVector c) throws IOException {
+ boolean hasNulls = currentVectorSlice.hasNulls();
+ boolean isSameValue = currentVectorSlice.isSameValue();
+ if (isSameValue) {
+ if (hasNulls) {
+ c.fillWithNulls();
+ } else {
+ c.fill(currentVectorSlice.getBytes());
+ }
+ } else {
+ c.reset();
+ c.noNulls = !hasNulls;
+ currentVectorSlice.copyBytes(c.vector, c.start, c.length, hasNulls ? c.isNull : null, 0);
+ if (hasNulls) {
+ NullUtil.setNullDataEntriesBytes(c, false, null, rowCount);
+ }
+ }
+ }
+ }
+ }
+}
Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/** Marker interface to indicate a given input format can be wrapped by Llap. */
+public interface LlapWrappableInputFormatInterface {
+
+}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Sep 16 17:50:02 2014
@@ -20,10 +20,15 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
class BitFieldReader {
private final RunLengthByteReader input;
+ /** The number of bits in one item. Non-test code always uses 1. */
private final int bitSize;
private int current;
private int bitsLeft;
@@ -62,9 +67,112 @@ class BitFieldReader {
return result & mask;
}
- void nextVector(LongColumnVector previous, long previousLen)
- throws IOException {
+ /**
+ * Unlike integer readers, where runs are encoded explicitly, in this one we have to read ahead
+ * to figure out whether we have a run. Given that runs in booleans are likely it's worth it.
+ * However it means we'd need to keep track of how many bytes we read, and next/nextVector won't
+ * work anymore once this is called. These is trivial to fix, but these are never interspersed.
+ */
+ private boolean lastRunValue;
+ private int lastRunLength = -1;
+ private void readNextRun(int maxRunLength) throws IOException {
+ assert bitSize == 1;
+ if (lastRunLength > 0) return; // last run is not exhausted yet
+ if (bitsLeft == 0) {
+ readByte();
+ }
+ // First take care of the partial bits.
+ boolean hasVal = false;
+ int runLength = 0;
+ if (bitsLeft != 8) {
+ int partialBitsMask = (1 << bitsLeft) - 1;
+ int partialBits = current & partialBitsMask;
+ if (partialBits == partialBitsMask || partialBits == 0) {
+ lastRunValue = (partialBits == partialBitsMask);
+ if (maxRunLength <= bitsLeft) {
+ lastRunLength = maxRunLength;
+ return;
+ }
+ maxRunLength -= bitsLeft;
+ hasVal = true;
+ runLength = bitsLeft;
+ bitsLeft = 0;
+ } else {
+ // There's no run in partial bits. Return whatever we have.
+ int prefixBitsCount = 32 - bitsLeft;
+ runLength = Integer.numberOfLeadingZeros(partialBits) - prefixBitsCount;
+ lastRunValue = (runLength > 0);
+ lastRunLength = Math.min(maxRunLength, lastRunValue ? runLength :
+ (Integer.numberOfLeadingZeros(~(partialBits | ~partialBitsMask)) - prefixBitsCount));
+ return;
+ }
+ assert bitsLeft == 0;
+ readByte();
+ }
+ if (!hasVal) {
+ lastRunValue = ((current >> 7) == 1);
+ hasVal = true;
+ }
+ // Read full bytes until the run ends.
+ assert bitsLeft == 8;
+ while (maxRunLength >= 8
+ && ((lastRunValue && (current == 0xff)) || (!lastRunValue && (current == 0)))) {
+ runLength += 8;
+ maxRunLength -= 8;
+ readByte();
+ }
+ if (maxRunLength > 0) {
+ int extraBits = Integer.numberOfLeadingZeros(
+ lastRunValue ? (~(current | ~255)) : current) - 24;
+ bitsLeft -= extraBits;
+ runLength += extraBits;
+ }
+ lastRunLength = runLength;
+ }
+
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ int nextChunk(
+ ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+ if (bitSize != 1) {
+ throw new AssertionError("Bit size should always be 1");
+ }
+ boolean mayHaveNulls = present != null;
+ NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ readNextRun(rowsToTransfer);
+ presentHelper.availLength = lastRunLength;
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ if (!presentHelper.isNullsRun) {
+ assert lastRunLength >= presentHelper.availLength;
+ lastRunLength -= presentHelper.availLength;
+ }
+ } else {
+ lastRunLength = 0;
+ }
+
+ assert rowsLeftToRead >= presentHelper.availLength;
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ writer.writeRepeatedLongs(lastRunValue ? 1 : 0, presentHelper.availLength,
+ presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ } // End of big loop.
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
+ void nextVector(LongColumnVector previous, long previousLen) throws IOException {
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
if (!previous.isNull[i]) {
@@ -117,4 +225,30 @@ class BitFieldReader {
return "bit reader current: " + current + " bits left: " + bitsLeft +
" bit size: " + bitSize + " from " + input;
}
+
+ boolean hasFullByte() {
+ return bitsLeft == 8 || bitsLeft == 0;
+ }
+
+ int peekOneBit() throws IOException {
+ assert bitSize == 1;
+ if (bitsLeft == 0) {
+ readByte();
+ }
+ return (current >>> (bitsLeft - 1)) & 1;
+ }
+
+ int peekFullByte() throws IOException {
+ assert bitSize == 1;
+ assert bitsLeft == 8 || bitsLeft == 0;
+ if (bitsLeft == 0) {
+ readByte();
+ }
+ return current;
+ }
+
+ void skipInCurrentByte(int bits) throws IOException {
+ assert bitsLeft >= bits;
+ bitsLeft -= bits;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Sep 16 17:50:02 2014
@@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
/**
* A tool for printing out the file structure of ORC files.
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
@@ -61,4 +63,6 @@ interface IntegerReader {
*/
void nextVector(LongColumnVector previous, long previousLen)
throws IOException;
+
+ int nextChunk(ChunkWriter writer, BitFieldReader present, long rowsLeft) throws IOException;
}
Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.BitFieldReader;
+
+public class LlapUtils {
+ public static final Log LOG = LogFactory.getLog(LlapUtils.class);
+ public static final int DOUBLE_GROUP_SIZE = 64; // just happens to be equal to bitmask size
+
+ /** Helper for readPresentStream. */
+ public static class PresentStreamReadResult {
+ public int availLength;
+ public boolean isNullsRun = false;
+ public boolean isFollowedByOther = false;
+ public void reset() {
+ isFollowedByOther = isNullsRun = false;
+ }
+ }
+
+ /**
+ * Helper method that reads present stream to find the size of the run of nulls, or a
+ * count of contiguous of non-null values based on the run length from the main stream.
+ * @param r Result is returned via this because java is not a real language.
+ * @param present The present stream.
+ * @param availLength The run length from the main stream.
+ * @param rowsLeftToRead Total number of rows that may be read from the main stream.
+ */
+ public static void readPresentStream(PresentStreamReadResult r,
+ BitFieldReader present, long rowsLeftToRead) throws IOException {
+ int presentBitsRead = 0;
+ r.reset();
+ // We are looking for a run of nulls no longer than rows to rowsLeftToRead (presumes
+ // they will all fit in the writer), OR a run of non-nulls no longer than availLength.
+
+ // If there's a partial byte in present stream, we will read bits from it.
+ boolean doneWithPresent = false;
+ int rowLimit = r.availLength;
+ while (!present.hasFullByte() && !doneWithPresent
+ && (presentBitsRead == 0 || (presentBitsRead < rowLimit))) {
+ int bit = present.peekOneBit();
+ if (presentBitsRead == 0) {
+ r.isNullsRun = (bit == 0);
+ rowLimit = (int)(r.isNullsRun ? rowsLeftToRead : r.availLength);
+ } else if (r.isNullsRun != (bit == 0)) {
+ doneWithPresent = r.isFollowedByOther = true;
+ break;
+ }
+ present.skipInCurrentByte(1);
+ ++presentBitsRead;
+ }
+ // Now, if we are not done, read the full bytes.
+ // TODO: we could ask the underlying byte stream of "present" reader for runs;
+ // many bitmasks might have long sequences of 0x00 or 0xff.
+ while (!doneWithPresent && (presentBitsRead == 0 || (presentBitsRead < rowLimit))) {
+ int bits = present.peekFullByte();
+ if (presentBitsRead == 0) {
+ r.isNullsRun = (bits & (1 << 7)) == 0;
+ rowLimit = (int)(r.isNullsRun ? rowsLeftToRead : r.availLength);
+ }
+ int bitsToTake = 0;
+ if ((bits == 0 && r.isNullsRun) || (bits == 0xff && !r.isNullsRun)) {
+ bitsToTake = 8;
+ } else {
+ doneWithPresent = r.isFollowedByOther = true;
+ // Get the number of leading 0s or 1s in this byte.
+ bitsToTake = Integer.numberOfLeadingZeros(r.isNullsRun ? bits : (~(bits | ~255))) - 24;
+ }
+ bitsToTake = Math.min(bitsToTake, rowLimit - presentBitsRead);
+ presentBitsRead += bitsToTake;
+ present.skipInCurrentByte(bitsToTake);
+ } // End of the loop reading full bytes.
+ assert presentBitsRead <= rowLimit : "Read " + presentBitsRead + " bits for "
+ + (r.isNullsRun ? "" : "non-") + "null run: " + rowsLeftToRead + ", " + r.availLength;
+ assert presentBitsRead > 0;
+ r.availLength = presentBitsRead;
+ }
+}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Sep 16 17:50:02 2014
@@ -42,11 +42,15 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -99,7 +103,7 @@ import com.google.common.util.concurrent
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct> {
+ AcidInputFormat<NullWritable, OrcStruct>, LlapWrappableInputFormatInterface {
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -107,7 +111,6 @@ public class OrcInputFormat implements
SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE");
static final String MAX_SPLIT_SIZE =
SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
- static final String SARG_PUSHDOWN = "sarg.pushdown";
private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -210,7 +213,7 @@ public class OrcInputFormat implements
boolean isOriginal =
!file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
List<OrcProto.Type> types = file.getTypes();
- setIncludedColumns(options, types, conf, isOriginal);
+ options.include(genIncludedColumns(types, conf, isOriginal));
setSearchArgument(options, types, conf, isOriginal);
return file.rowsOptions(options);
}
@@ -234,6 +237,21 @@ public class OrcInputFormat implements
}
}
+ public static boolean[] genIncludedColumns(
+ List<OrcProto.Type> types, List<Integer> included, boolean isOriginal) {
+ int rootColumn = getRootColumn(isOriginal);
+ int numColumns = types.size() - rootColumn;
+ boolean[] result = new boolean[numColumns];
+ result[0] = true;
+ OrcProto.Type root = types.get(rootColumn);
+ for(int i=0; i < root.getSubtypesCount(); ++i) {
+ if (included.contains(i)) {
+ includeColumnRecursive(types, result, root.getSubtypes(i),
+ rootColumn);
+ }
+ }
+ return result;
+ }
/**
* Take the configuration and figure out which columns we need to include.
* @param options the options to update
@@ -241,26 +259,13 @@ public class OrcInputFormat implements
* @param conf the configuration
* @param isOriginal is the file in the original format?
*/
- static void setIncludedColumns(Reader.Options options,
- List<OrcProto.Type> types,
- Configuration conf,
- boolean isOriginal) {
- int rootColumn = getRootColumn(isOriginal);
- if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
- int numColumns = types.size() - rootColumn;
- boolean[] result = new boolean[numColumns];
- result[0] = true;
- OrcProto.Type root = types.get(rootColumn);
+ public static boolean[] genIncludedColumns(
+ List<OrcProto.Type> types, Configuration conf, boolean isOriginal) {
+ if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
- for(int i=0; i < root.getSubtypesCount(); ++i) {
- if (included.contains(i)) {
- includeColumnRecursive(types, result, root.getSubtypes(i),
- rootColumn);
- }
- }
- options.include(result);
+ return genIncludedColumns(types, included, isOriginal);
} else {
- options.include(null);
+ return null;
}
}
@@ -268,37 +273,33 @@ public class OrcInputFormat implements
List<OrcProto.Type> types,
Configuration conf,
boolean isOriginal) {
- int rootColumn = getRootColumn(isOriginal);
- String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
- String sargPushdown = conf.get(SARG_PUSHDOWN);
- String columnNamesString =
- conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
- if ((sargPushdown == null && serializedPushdown == null)
- || columnNamesString == null) {
+ String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ if (columnNamesString == null) {
+ LOG.debug("No ORC pushdown predicate - no column names");
+ options.searchArgument(null, null);
+ return;
+ }
+ SearchArgument sarg = SearchArgumentFactory.createFromConf(conf);
+ if (sarg == null) {
LOG.debug("No ORC pushdown predicate");
options.searchArgument(null, null);
- } else {
- SearchArgument sarg;
- if (serializedPushdown != null) {
- sarg = SearchArgumentFactory.create
- (Utilities.deserializeExpression(serializedPushdown));
- } else {
- sarg = SearchArgumentFactory.create(sargPushdown);
- }
- LOG.info("ORC pushdown predicate: " + sarg);
- String[] neededColumnNames = columnNamesString.split(",");
- String[] columnNames = new String[types.size() - rootColumn];
- boolean[] includedColumns = options.getInclude();
- int i = 0;
- for(int columnId: types.get(rootColumn).getSubtypesList()) {
- if (includedColumns == null || includedColumns[columnId - rootColumn]) {
- // this is guaranteed to be positive because types only have children
- // ids greater than their own id.
- columnNames[columnId - rootColumn] = neededColumnNames[i++];
- }
+ return;
+ }
+
+ LOG.info("ORC pushdown predicate: " + sarg);
+ int rootColumn = getRootColumn(isOriginal);
+ String[] neededColumnNames = columnNamesString.split(",");
+ String[] columnNames = new String[types.size() - rootColumn];
+ boolean[] includedColumns = options.getInclude();
+ int i = 0;
+ for(int columnId: types.get(rootColumn).getSubtypesList()) {
+ if (includedColumns == null || includedColumns[columnId - rootColumn]) {
+ // this is guaranteed to be positive because types only have children
+ // ids greater than their own id.
+ columnNames[columnId - rootColumn] = neededColumnNames[i++];
}
- options.searchArgument(sarg, columnNames);
}
+ options.searchArgument(sarg, columnNames);
}
@Override
@@ -752,7 +753,7 @@ public class OrcInputFormat implements
// deltas may change the rows making them match the predicate.
if (deltas.isEmpty()) {
Reader.Options options = new Reader.Options();
- setIncludedColumns(options, types, context.conf, isOriginal);
+ options.include(genIncludedColumns(types, context.conf, isOriginal));
setSearchArgument(options, types, context.conf, isOriginal);
if (options.getSearchArgument() != null) {
SearchArgument sarg = options.getSearchArgument();
@@ -1097,7 +1098,7 @@ public class OrcInputFormat implements
.getBucket();
reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
final List<OrcProto.Type> types = reader.getTypes();
- setIncludedColumns(readOptions, types, conf, split.isOriginal());
+ readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
setSearchArgument(readOptions, types, conf, split.isOriginal());
} else {
bucket = (int) split.getStart();
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Sep 16 17:50:02 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;