You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC

svn commit: r1625344 [3/4] - in /hive/branches/llap: ./ common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/ itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/ llap-client/src/java/org/ llap-client/src/java/org/apache/ ...

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkProducerFeedback.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Vector;
+
+/**
+ * A feedback interface for chunk producer-consumer pipeline.
+ */
+public interface ChunkProducerFeedback {
+  void returnCompleteVector(Vector vector);
+  void stop();
+  // We may add throttling here later.
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Pool.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.processor;
+
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.loader.Loader;
+import org.apache.hadoop.hive.llap.loader.OrcLoader;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// TODO: write unit tests if this class becomes less primitive.
+public class Pool {
+  // TODO: for now, pool is of dubious value. There's one processor per request.
+  //       So, this provides thread safety that may or may not be needed.
+  private final LinkedList<Processor> processors = new LinkedList<Processor>();
+  private static final int POOL_LIMIT = 10;
+  // There's only one loader, assumed to be thread safe.
+  private final Loader loader;
+  private final ExecutorService threadPool;
+
+  public Pool(Loader loader, Configuration conf) {
+    this.loader = loader;
+    int threadCount = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_REQUEST_THREAD_COUNT);
+    this.threadPool = Executors.newFixedThreadPool(threadCount,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Llap thread %d").build());
+  }
+
+  public void enqueue(RequestImpl request, ChunkConsumer consumer) {
+    Processor proc = null;
+    synchronized (processors) {
+      proc = processors.poll();
+    }
+    if (proc == null) {
+      proc = new Processor(this, loader);
+    }
+    proc.setRequest(request, consumer);
+    threadPool.submit(proc);
+  }
+
+  void returnProcessor(Processor proc) {
+    synchronized (processors) {
+      if (processors.size() < POOL_LIMIT) {
+        processors.add(proc);
+      }
+    }
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/Processor.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.loader.Loader;
+
+/**
+ * Request processor class. Currently, of dubious value.
+ */
+public class Processor implements Runnable {
+  private final Pool parent;
+  private final Loader loader;
+  private RequestImpl request;
+  private ChunkConsumer consumer;
+
+  public Processor(Pool pool, Loader loader) {
+    this.parent = pool;
+    this.loader = loader;
+  }
+
+  public void setRequest(RequestImpl request, ChunkConsumer consumer) {
+    this.request = request;
+    this.consumer = consumer;
+  }
+
+  @Override
+  public void run() {
+    try {
+      loader.load(request, consumer); // Synchronous load call that return results via consumer.
+    } catch (Throwable t) {
+      Llap.LOG.error("Load failed", t);
+      consumer.setError(t);
+    }
+    parent.returnProcessor(this);
+  }
+}

Added: hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/cache/TestLrfuCachePolicy.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLrfuCachePolicy {
+  private static final Log LOG = LogFactory.getLog(TestLrfuCachePolicy.class);
+
+  @Test
+  public void testHeapSize1() {
+    testHeapSize(1);
+  }
+
+  @Test
+  public void testHeapSize2() {
+    testHeapSize(2);
+  }
+
+  @Test
+  public void testHeapSize7() {
+    testHeapSize(7);
+  }
+
+  @Test
+  public void testHeapSize8() {
+    testHeapSize(8);
+  }
+
+  @Test
+  public void testHeapSize30() {
+    testHeapSize(30);
+  }
+
+  @Test
+  public void testLfuExtreme() {
+    int heapSize = 4;
+    LOG.info("Testing lambda 0 (LFU)");
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+    LrfuCachePolicy lfu = new LrfuCachePolicy(conf, 1, heapSize);
+    for (int i = 0; i < heapSize; ++i) {
+      WeakBuffer buffer = BufferPool.allocateFake();
+      assertNull(cache(lfu, buffer));
+      inserted.add(buffer);
+    }
+    Collections.shuffle(inserted, rdm);
+    // LFU extreme, order of accesses should be ignored, only frequency matters.
+    // We touch first elements later, but do it less times, so they will be evicted first.
+    for (int i = inserted.size() - 1; i >= 0; --i) {
+      for (int j = 0; j < i + 1; ++j) {
+        lfu.notifyLock(inserted.get(i));
+        lfu.notifyUnlock(inserted.get(i));
+      }
+    }
+    verifyOrder(lfu, inserted);
+  }
+
+  @Test
+  public void testLruExtreme() {
+    int heapSize = 4;
+    LOG.info("Testing lambda 1 (LRU)");
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+    LrfuCachePolicy lru = new LrfuCachePolicy(conf, 1, heapSize);
+    for (int i = 0; i < heapSize; ++i) {
+      WeakBuffer buffer = BufferPool.allocateFake();
+      assertNull(cache(lru, buffer));
+      inserted.add(buffer);
+    }
+    Collections.shuffle(inserted, rdm);
+    // LRU extreme, frequency of accesses should be ignored, only order matters.
+    for (int i = 0; i < inserted.size(); ++i) {
+      for (int j = 0; j < (inserted.size() - i); ++j) {
+        lru.notifyLock(inserted.get(i));
+        lru.notifyUnlock(inserted.get(i));
+      }
+    }
+    verifyOrder(lru, inserted);
+  }
+
+  @Test
+  public void testDeadlockResolution() {
+    int heapSize = 4;
+    LOG.info("Testing deadlock resolution");
+    ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+    LrfuCachePolicy lrfu = new LrfuCachePolicy(new HiveConf(), 1, heapSize);
+    for (int i = 0; i < heapSize; ++i) {
+      WeakBuffer buffer = BufferPool.allocateFake();
+      assertNull(cache(lrfu, buffer));
+      inserted.add(buffer);
+    }
+    // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
+    WeakBuffer locked = inserted.get(0);
+    lock(lrfu, locked);
+    WeakBuffer evicted = lrfu.evictOneMoreBlock();
+    assertNotNull(evicted);
+    assertTrue(evicted.isInvalid());
+    assertNotSame(locked, evicted);
+    unlock(lrfu, locked);
+  }
+
+  // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
+  public WeakBuffer cache(LrfuCachePolicy lrfu, WeakBuffer buffer) {
+    buffer.lock(false);
+    WeakBuffer result = lrfu.cache(buffer);
+    buffer.unlock();
+    if (result != CachePolicy.CANNOT_EVICT) {
+      lrfu.notifyUnlock(buffer);
+    }
+    return result;
+  }
+
+  private static void lock(LrfuCachePolicy lrfu, WeakBuffer locked) {
+    locked.lock(false);
+    lrfu.notifyLock(locked);
+  }
+
+  private static void unlock(LrfuCachePolicy lrfu, WeakBuffer locked) {
+    locked.unlock();
+    lrfu.notifyUnlock(locked);
+  }
+
+  private void testHeapSize(int heapSize) {
+    LOG.info("Testing heap size " + heapSize);
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.01f);
+    LrfuCachePolicy lrfu = new LrfuCachePolicy(conf, 1, heapSize);
+    // Insert the number of elements plus 2, to trigger 2 evictions.
+    int toEvict = 2;
+    ArrayList<WeakBuffer> inserted = new ArrayList<WeakBuffer>(heapSize);
+    WeakBuffer[] evicted = new WeakBuffer[toEvict];
+    Assume.assumeTrue(toEvict <= heapSize);
+    for (int i = 0; i < heapSize + toEvict; ++i) {
+      WeakBuffer buffer = BufferPool.allocateFake();
+      WeakBuffer evictedBuf = cache(lrfu, buffer);
+      if (i < toEvict) {
+        evicted[i] = buffer;
+      } else {
+        if (i >= heapSize) {
+          assertSame(evicted[i - heapSize], evictedBuf);
+          assertTrue(evictedBuf.isInvalid());
+        } else {
+          assertNull(evictedBuf);
+        }
+        inserted.add(buffer);
+      }
+    }
+    LOG.info("Inserted " + dumpInserted(inserted));
+    // We will touch all blocks in random order.
+    Collections.shuffle(inserted, rdm);
+    LOG.info("Touch order " + dumpInserted(inserted));
+    // Lock entire heap; heap is still full; we should not be able to evict or insert.
+    for (WeakBuffer buf : inserted) {
+      lock(lrfu, buf);
+    }
+    WeakBuffer block = lrfu.evictOneMoreBlock();
+    assertNull("Got " + block, block);
+    assertSame(CachePolicy.CANNOT_EVICT, cache(lrfu, BufferPool.allocateFake()));
+    for (WeakBuffer buf : inserted) {
+      unlock(lrfu, buf);
+    }
+    // To make (almost) sure we get definite order, touch blocks in order large number of times.
+    for (WeakBuffer buf : inserted) {
+      // TODO: this seems to indicate that priorities change too little...
+      //       perhaps we need to adjust the policy.
+      for (int j = 0; j < 10; ++j) {
+        lrfu.notifyLock(buf);
+        lrfu.notifyUnlock(buf);
+      }
+    }
+    verifyOrder(lrfu, inserted);
+  }
+
+  private void verifyOrder(LrfuCachePolicy lrfu, ArrayList<WeakBuffer> inserted) {
+    WeakBuffer block;
+    // Evict all blocks.
+    for (int i = 0; i < inserted.size(); ++i) {
+      block = lrfu.evictOneMoreBlock();
+      assertTrue(block.isInvalid());
+      assertSame(inserted.get(i), block);
+    }
+    // The map should now be empty.
+    assertNull(lrfu.evictOneMoreBlock());
+  }
+
+  private String dumpInserted(ArrayList<WeakBuffer> inserted) {
+    String debugStr = "";
+    for (int i = 0; i < inserted.size(); ++i) {
+      if (i != 0) debugStr += ", ";
+      debugStr += inserted.get(i);
+    }
+    return debugStr;
+  }
+}

Added: hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java (added)
+++ hive/branches/llap/llap/src/test/org/apache/hadoop/hive/llap/chunk/TestChunkReaderAndWriter.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
+import org.apache.hadoop.hive.llap.loader.BufferInProgress;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestChunkReaderAndWriter {
+  private static final Log LOG = LogFactory.getLog(TestChunkReaderAndWriter.class);
+  private static final int LARGE_BUFFER = 1024 * 1024;
+
+  @Test
+  public void testSimpleValues() throws Exception {
+    testValues(0.0);
+  }
+
+  @Test
+  public void testValuesWithNulls() throws Exception {
+    testValues(0.02);
+    testValues(0.5);
+    testValues(0.98);
+  }
+
+  @Test
+  public void testNullSegmentTransitions() throws Exception {
+    Random random = new Random(1234);
+    ChunkWriterImpl writer = new ChunkWriterImpl();
+    BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+    writer.prepare(buf);
+    int ncount1 = 1, ncount2 = 3, ncount3 = 60, ncount4 = 3;
+    Long[] src = new Long[ncount1 + ncount2 + ncount3 + ncount4 + 1];
+    int offset = 0;
+
+    // Try various combinations of repeating and non-repeating segments.
+    offset = writeNulls(writer, ncount1, src, offset);
+    long[] tmp = new long[1];
+    tmp[0] = random.nextLong();
+    src[offset] = tmp[0];
+    writer.writeLongs(tmp, 0, 1, NullsState.NEXT_NULL);
+    ++offset;
+    offset = writeNulls(writer, ncount2, src, offset);
+    offset = writeNulls(writer, ncount3, src, offset);
+    offset = writeNulls(writer, ncount4, src, offset);
+
+    writer.finishCurrentSegment();
+    verifyValues(completeChunk(writer, buf), src);
+  }
+
+  @Test
+  public void testRepeatingSegmentTransitions() throws Exception {
+    Random random = new Random(1234);
+    ChunkWriterImpl writer = new ChunkWriterImpl();
+    BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+    writer.prepare(buf);
+    int repeating1 = 10, rcount1 = 5, repeating2 = -10, rcount2 = 5, repeating3 = 4, rcount3 = 15;
+    int nrcount1 = 30, nrcount2 = 30, nrcount3 = 30;
+    long[] src = new long[rcount1 + rcount2 + rcount3 + nrcount1 + nrcount2 + nrcount3];
+    int offset = 0;
+
+    // Try various combinations of repeating and non-repeating segments.
+    offset = writeRepeatedValues(writer, repeating1, rcount1, src, offset);
+    offset = writeLongs(random, writer, nrcount1, src, offset);
+    offset = writeRepeatedValues(writer, repeating2, rcount2, src, offset);
+    offset = writeLongs(random, writer, nrcount2, src, offset);
+    offset = writeRepeatedValues(writer, repeating3, rcount3, src, offset);
+    offset = writeLongs(random, writer, nrcount3, src, offset);
+
+    writer.finishCurrentSegment();
+    verifyValues(completeChunk(writer, buf), src);
+  }
+
+  @Test
+  public void testBufferBoundary() throws Exception {
+    Random random = new Random(1234);
+    ChunkWriterImpl writer = new ChunkWriterImpl();
+    BufferInProgress buf = new BufferInProgress(createBuffer(128 * 8));
+    writer.prepare(buf);
+    long[] tmp = new long[122];
+    Long[] src = new Long[tmp.length + 2];
+    src[0] = tmp[0] = 3;
+    // This should start a segment with bitmasks and use up 4 * 8 bytes total.
+    // Plus, 8 bytes are already used up for the chunk header.
+    writer.writeLongs(tmp, 0, 1, NullsState.NEXT_NULL);
+    src[1] = null;
+    writer.writeNulls(1, true);
+    // Now we have 123 * 8 bytes; 122 values would not fit w/bitmask, but they will w/o one.
+    for (int i = 0; i < tmp.length; ++i) {
+      src[i + 2] = tmp[i] = random.nextLong();
+    }
+    writer.writeLongs(tmp, 0, tmp.length, NullsState.NEXT_NULL);
+    writer.finishCurrentSegment();
+    verifyValues(completeChunk(writer, buf), src);
+  }
+
+  @Test
+  public void testBoundaryValues() throws Exception {
+    ChunkWriterImpl writer = new ChunkWriterImpl();
+    BufferInProgress buf = new BufferInProgress(createBuffer(LARGE_BUFFER));
+    writer.prepare(buf);
+    long[] src = new long[9];
+    Arrays.fill(src, 0l);
+    src[2] = Long.MIN_VALUE;
+    src[6] = Long.MAX_VALUE;
+    src[8] = Integer.MIN_VALUE;
+    writer.writeLongs(src, 0, src.length, NullsState.NEXT_NULL);
+    writer.finishCurrentSegment();
+    verifyValues(completeChunk(writer, buf), src);
+  }
+
+  /**
+   * A method for generic non-scenario tests.
+   * @param nullFraction Percentage of nulls in values.
+   */
+  private void testValues(double nullFraction) throws Exception {
+    Random random = new Random(1234);
+    ChunkWriterImpl writer = new ChunkWriterImpl();
+    WeakBuffer wb = createBuffer(LARGE_BUFFER);
+    // Value counts to test
+    int[] valueCounts = new int[] { 1, 3, 11, 64, 65, 2048 };
+    int totalCount = 0;
+    for (int valueCount : valueCounts) {
+      LOG.info("Testing for value count " + valueCount);
+      BufferInProgress buf = new BufferInProgress(wb);
+      writer.prepare(buf);
+      totalCount += valueCount;
+      Long[] src = new Long[valueCount];
+      writeLongs(writer, random, nullFraction, valueCount, src, 0);
+      writer.finishCurrentSegment();
+      verifyValues(completeChunk(writer, buf), src);
+    }
+
+    // Then try all together
+    BufferInProgress buf = new BufferInProgress(wb);
+    writer.prepare(buf);
+    Long[] src = new Long[totalCount];
+    int offset = 0;
+    LOG.info("Testing for total count " + totalCount);
+    for (int valueCount : valueCounts) {
+      writeLongs(writer, random, nullFraction, valueCount, src, offset);
+      offset += valueCount;
+    }
+    writer.finishCurrentSegment();
+    verifyValues(completeChunk(writer, buf), src);
+  }
+
+  private static void writeLongs(ChunkWriterImpl writer,
+      Random rdm, double nullFraction, int count, Long[] src, int srcOffset) {
+    NullsState nullState = nullFraction == 0 ? NullsState.NO_NULLS : NullsState.NEXT_NULL;
+    long[] srcPart = new long[count];
+    int offset = 0;
+    boolean isFirst = true, isNull = true;
+    while (offset < count) {
+      int runLength = 0;
+      if (!isFirst) {
+        isNull = !isNull;
+        if (!isNull) {
+          srcPart[0] = rdm.nextLong();
+        }
+        src[srcOffset + offset] = isNull ? null : srcPart[0];
+        runLength = 1;
+        ++offset;
+      }
+      while (offset < count) {
+        boolean curIsNull = (rdm.nextDouble() <= nullFraction);
+        if (!isFirst && curIsNull != isNull) break;
+        isNull = curIsNull;
+        if (!isNull) {
+          srcPart[runLength] = rdm.nextLong();
+        }
+        src[srcOffset + offset] = isNull ? null : srcPart[runLength];
+        ++runLength;
+        ++offset;
+        isFirst = false;
+      }
+      if (isNull) {
+        LOG.info("Writing " + runLength + " nulls");
+        writer.writeNulls(runLength, true);
+      } else {
+        LOG.info("Writing " + runLength + " values");
+        writer.writeLongs(srcPart, 0, runLength, nullState);
+      }
+    }
+  }
+
+  private static int writeLongs(
+      Random random, ChunkWriterImpl writer, int count, long[] src, int offset) {
+    for (int i = 0; i < count; ++i) {
+      src[offset + i] = random.nextLong();
+    }
+    writer.writeLongs(src, offset, count, NullsState.NO_NULLS);
+    return offset + count;
+  }
+
+  private static int writeRepeatedValues(
+      ChunkWriterImpl writer, int val, int count, long[] src, int offset) {
+    for (int i = 0; i < count; ++i) {
+      src[offset + i] = val;
+    }
+    writer.writeRepeatedLongs(val, count, NullsState.NO_NULLS);
+    return offset + count;
+  }
+
+  private static int writeNulls(
+      ChunkWriterImpl writer, int count, Long[] src, int offset) {
+    for (int i = 0; i < count; ++i) {
+      src[offset + i] = null;
+    }
+    writer.writeNulls(count, false);
+    return offset + count;
+  }
+
+  private static void verifyValues(Chunk chunk, Long[] src) throws Exception {
+    verifyValues(chunk, src, src.length);
+  }
+
+  private static void verifyValues(Chunk chunk, long[] src) throws Exception {
+    verifyValues(chunk, src, src.length);
+  }
+
+  private static void verifyValues(Chunk chunk, Object src, int srcLength) throws Exception {
+    boolean nullable = src instanceof Long[];
+    Long[] src0 = nullable ? (Long[])src : null;
+    long[] src1 = nullable ? null : (long[])src;
+    int[] stepsToVerify = new int[] { srcLength, srcLength / 2, 63, 5, 1 };
+    long[] dest = new long[srcLength];
+    boolean[] isNull = new boolean[srcLength];
+    int lastStep = -1;
+    for (int step : stepsToVerify) {
+      if (step > srcLength || step == lastStep || step == 0) continue;
+      if ((srcLength / step) > 50) continue; // too slow
+      LOG.info("Verifying value count " + srcLength + " w/step " + step);
+      Arrays.fill(dest, -1);
+      int offset = 0;
+      ChunkReader reader = new ChunkReader(Type.LONG, chunk);
+      while (offset < srcLength) {
+        int adjStep = Math.min(srcLength - offset, step);
+        try {
+          reader.next(adjStep);
+          reader.copyLongs(dest, isNull, offset);
+        } catch (Exception ex) {
+          LOG.error("srcLength " + srcLength + ", step "
+              + adjStep + "/" + step + " offset " + offset, ex);
+          throw ex;
+        }
+        offset += adjStep;
+      }
+      if (nullable) {
+        verifyArrays(src0, dest, isNull);
+      } else {
+        verifyArrays(src1, dest, isNull);
+      }
+      lastStep = step;
+    }
+  }
+
+  private static void verifyArrays(Long[] src, long[] dest, boolean[] isNull) {
+    for (int i = 0; i < src.length; ++i) {
+      boolean curIsNull = (src[i] == null);
+      assertTrue(i + ": " + src[i] + " got " + dest[i] + "/" + isNull[i], curIsNull == isNull[i]);
+      if (!curIsNull) {
+        assertEquals(i + ": " + src[i] + " got " + dest[i], src[i].longValue(), dest[i]);
+      }
+    }
+  }
+
+  private static void verifyArrays(long[] src, long[] dest, boolean[] isNull) {
+    for (int i = 0; i < src.length; ++i) {
+      assertEquals(i + ": " + src[i] + " got " + dest[i], src[i], dest[i]);
+      assertFalse(i + ": unexpected null", isNull[i]);
+    }
+  }
+
+  private static Chunk completeChunk(ChunkWriterImpl writer, BufferInProgress buf) {
+    int rows = buf.getChunkInProgressRows();
+    Chunk c = buf.extractChunk();
+    writer.finishChunk(c, rows);
+    return c;
+  }
+
+  private static WeakBuffer createBuffer(int bufferSize) {
+    HiveConf hc = new HiveConf();
+    hc.setInt(HiveConf.ConfVars.LLAP_BUFFER_SIZE.varname, bufferSize);
+    BufferPool bp = new BufferPool(hc, null);
+    try {
+      return bp.allocateBuffer();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Modified: hive/branches/llap/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/pom.xml?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/pom.xml (original)
+++ hive/branches/llap/pom.xml Tue Sep 16 17:50:02 2014
@@ -42,6 +42,8 @@
     <module>hwi</module>
     <module>jdbc</module>
     <module>metastore</module>
+    <module>llap-client</module>
+    <module>llap</module>
     <module>odbc</module>
     <module>ql</module>
     <module>serde</module>

Modified: hive/branches/llap/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/pom.xml?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/pom.xml (original)
+++ hive/branches/llap/ql/pom.xml Tue Sep 16 17:50:02 2014
@@ -57,6 +57,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-shims</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -566,6 +571,8 @@
                   <include>org.apache.hive:hive-common</include>
                   <include>org.apache.hive:hive-exec</include>
                   <include>org.apache.hive:hive-serde</include>
+                  <include>org.apache.hive:hive-llap-client</include>
+                  <include>org.apache.hive:hive-llap</include>
                   <include>com.esotericsoftware.kryo:kryo</include>
                   <include>com.twitter:parquet-hadoop-bundle</include>
                   <include>org.apache.thrift:libthrift</include>

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Sep 16 17:50:02 2014
@@ -225,6 +225,7 @@ public class FetchOperator implements Se
   @SuppressWarnings("unchecked")
   static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass,
       Configuration conf) throws IOException {
+    // TODO: why is this copy-pasted from HiveInputFormat?
     if (!inputFormats.containsKey(inputFormatClass)) {
       try {
         InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
@@ -235,7 +236,7 @@ public class FetchOperator implements Se
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return inputFormats.get(inputFormatClass);
+    return HiveInputFormat.wrapForLlap(inputFormats.get(inputFormatClass), conf);
   }
 
   private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Tue Sep 16 17:50:02 2014
@@ -239,6 +239,7 @@ public class StatsNoJobTask extends Task
           boolean statsAvailable = false;
           for(FileStatus file: fileList) {
             if (!file.isDir()) {
+              // TODO: do we need to wrap for Llap here? probably later when stats are cached?
               InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) ReflectionUtils.newInstance(
                   table.getInputFormatClass(), jc);
               InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { table

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Sep 16 17:50:02 2014
@@ -213,6 +213,7 @@ public class RowContainer<ROW extends Li
         JobConf localJc = getLocalFSJobConfClone(jc);
         if (inputSplits == null) {
           if (this.inputFormat == null) {
+            // TODO: do we need to wrap here?
             inputFormat = ReflectionUtils.newInstance(
                 tblDesc.getInputFileFormatClass(), localJc);
           }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Tue Sep 16 17:50:02 2014
@@ -132,6 +132,8 @@ public class TezProcessor extends Abstra
         rproc = new MapRecordProcessor(jobConf);
         MRInputLegacy mrInput = getMRInput(inputs);
         try {
+          // TODO: This might create oldInputFormat in MRInput.
+          //       We are assuming we don't need to wrap it for Llap.
           mrInput.init();
         } catch (IOException e) {
           throw new RuntimeException("Failed while initializing MRInput", e);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Tue Sep 16 17:50:02 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.io.NullWritable;
@@ -311,6 +312,14 @@ public class BytesColumnVector extends C
     setRef(0, value, 0, value.length);
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = null;
+    isNull[0] = true;
+  }
+
   @Override
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
     BytesColumnVector in = (BytesColumnVector) inputVector;
@@ -321,4 +330,9 @@ public class BytesColumnVector extends C
   public void init() {
     initBuffer(0);
   }
+
+  @Override
+  public void visit(ColumnVectorVisitor v) throws IOException {
+    v.visit(this);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java Tue Sep 16 17:50:02 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.io.Writable;
@@ -156,5 +157,7 @@ public abstract class ColumnVector {
     public void init() {
       // Do nothing by default
     }
+
+    public abstract void visit(ColumnVectorVisitor v) throws IOException;
   }
 

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVectorVisitor.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+
+/**
+ * I can has dynamic dispatch?
+ */
+public interface ColumnVectorVisitor {
+  void visit(LongColumnVector v) throws IOException;
+  void visit(DoubleColumnVector v) throws IOException;
+  void visit(DecimalColumnVector v) throws IOException;
+  void visit(BytesColumnVector v) throws IOException;
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,8 @@
  */
 
 package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
+
 import org.apache.hadoop.hive.common.type.Decimal128;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -71,6 +73,21 @@ public class DecimalColumnVector extends
     }
   }
 
+  // Fill the all the vector entries with provided value
+  public void fill(Decimal128 value) {
+    noNulls = true;
+    isRepeating = true;
+    vector[0] = value;
+  }
+
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = null;
+    isNull[0] = true;
+  }
+
   @Override
   public void flatten(boolean selectedInUse, int[] sel, int size) {
     // TODO Auto-generated method stub
@@ -96,4 +113,9 @@ public class DecimalColumnVector extends
       isNull[i] = true;
     }
   }
+
+  @Override
+  public void visit(ColumnVectorVisitor v) throws IOException {
+    v.visit(this);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -121,6 +122,14 @@ public class DoubleColumnVector extends 
     vector[0] = value;
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = NULL_VALUE;
+    isNull[0] = true;
+  }
+
   // Simplify vector by brute-force flattening noNulls and isRepeating
   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
   // with many arguments.
@@ -146,4 +155,9 @@ public class DoubleColumnVector extends 
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
     vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum];
   }
+
+  @Override
+  public void visit(ColumnVectorVisitor v) throws IOException {
+    v.visit(this);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java Tue Sep 16 17:50:02 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.io.LongWritable;
@@ -165,6 +166,14 @@ public class LongColumnVector extends Co
     vector[0] = value;
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = NULL_VALUE;
+    isNull[0] = true;
+  }
+
   // Simplify vector by brute-force flattening noNulls and isRepeating
   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
   // with many arguments.
@@ -190,4 +199,9 @@ public class LongColumnVector extends Co
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
     vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum];
   }
+
+  @Override
+  public void visit(ColumnVectorVisitor v) throws IOException {
+    v.visit(this);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Sep 16 17:50:02 2014
@@ -268,8 +268,7 @@ public class VectorizedRowBatchCtx {
    * @return VectorizedRowBatch
    * @throws HiveException
    */
-  public VectorizedRowBatch createVectorizedRowBatch() throws HiveException
-  {
+  public VectorizedRowBatch createVectorizedRowBatch() throws HiveException {
     List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
     VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
     for (int j = 0; j < fieldRefs.size(); j++) {
@@ -394,8 +393,7 @@ public class VectorizedRowBatchCtx {
    * @param batch
    * @throws HiveException
    */
-  public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException
-  {
+  public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException {
     int colIndex;
     Object value;
     PrimitiveCategory pCategory;
@@ -640,4 +638,21 @@ public class VectorizedRowBatchCtx {
     }
     return assigners;
   }
+
+  public int[] getIncludedColumnIndexes() {
+    int colCount = (colsToInclude == null)
+        ? rowOI.getAllStructFieldRefs().size() : colsToInclude.size();
+    int[] result = new int[colCount];
+    if (colsToInclude == null) {
+      for (int i = 0; i < result.length; ++i) {
+        result[i] = i;
+      }
+    } else {
+      int i = -1;
+      for (int colIx : colsToInclude) {
+        result[++i] = colIx;
+      }
+    }
+    return result;
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/NullUtil.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.exec.vector.expressions;
 
 import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -54,6 +56,31 @@ public class NullUtil {
     }
   }
 
+  /**
+   * Set the data value for all NULL entries to the designated NULL_VALUE.
+   */
+  public static void setNullDataEntriesBytes(
+      BytesColumnVector v, boolean selectedInUse, int[] sel, int n) {
+    if (v.noNulls) {
+      return;
+    } else if (v.isRepeating && v.isNull[0]) {
+      v.vector[0] = null;
+    } else if (selectedInUse) {
+      for (int j = 0; j != n; j++) {
+        int i = sel[j];
+        if(v.isNull[i]) {
+          v.vector[i] = null;
+        }
+      }
+    } else {
+      for (int i = 0; i != n; i++) {
+        if(v.isNull[i]) {
+          v.vector[i] = null;
+        }
+      }
+    }
+  }
+
   // for use by Column-Scalar and Scalar-Column arithmetic for null propagation
   public static void setNullOutputEntriesColScalar(
       ColumnVector v, boolean selectedInUse, int[] sel, int n) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Sep 16 17:50:02 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.Pa
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -194,6 +196,30 @@ public class HiveInputFormat<K extends W
     this.job = job;
   }
 
+  public static InputFormat<WritableComparable, Writable> wrapForLlap(
+      InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+    if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_ENABLED)) return inputFormat;
+    boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface,
+        isVector = Utilities.isVectorMode(conf);
+    if (!isSupported || !isVector) {
+      LOG.info("Not using llap for " + inputFormat + ": " + isSupported + ", " + isVector);
+      return inputFormat;
+    }
+    LOG.info("Wrapping " + inputFormat);
+    // TODO: we'd actually need a more specific template parameter for non-vectorized one...
+    //       no idea how this is going to work at this point.
+    InputFormat<NullWritable, Writable> inputToWrap = castInputFormat(inputFormat);
+    return castInputFormat(new LlapInputFormat<Writable>(inputToWrap, conf));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T, U, V> InputFormat<T, V> castInputFormat(InputFormat<U, V> from) {
+    // We assume that LlapWrappableInputFormatInterface has NullWritable as first parameter.
+    // Since we are using Java and not, say, a programming language, there's no way to check.
+    return (InputFormat<T, V>)from;
+  }
+
+
   public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
     Class inputFormatClass, JobConf job) throws IOException {
 
@@ -207,7 +233,7 @@ public class HiveInputFormat<K extends W
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return inputFormats.get(inputFormatClass);
+    return wrapForLlap(inputFormats.get(inputFormatClass), job);
   }
 
   public RecordReader getRecordReader(InputSplit split, JobConf job,

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapInputFormat.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.api.Reader;
+import org.apache.hadoop.hive.llap.api.RequestFactory;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.ColumnReader;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVectorVisitor;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.NullUtil;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+public class LlapInputFormat<T>
+  implements InputFormat<NullWritable, T>, VectorizedInputFormatInterface {
+  /** See RequestFactory class documentation on why this is necessary */
+  private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.api.Llap";
+
+  private final InputFormat<NullWritable, T> realInputFormat;
+  private final RequestFactory reqFactory;
+  private static final Log LOG = LogFactory.getLog(LlapInputFormat.class);
+
+  public LlapInputFormat(InputFormat<NullWritable, T> realInputFormat, Configuration conf) {
+    this.realInputFormat = realInputFormat;
+    try {
+      reqFactory = (RequestFactory)ReflectionUtils.newInstance(Class.forName(IMPL_CLASS), conf);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Failed to initialize impl", e);
+    }
+  }
+
+  @Override
+  public RecordReader<NullWritable, T> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+    boolean isVectorMode = Utilities.isVectorMode(job);
+    if (!isVectorMode) {
+      LOG.error("No llap in non-vectorized mode; falling back to original");
+      throw new UnsupportedOperationException("No llap in non-vectorized mode");
+      // return realInputFormat.getRecordReader(split, job, reporter);
+    }
+    FileSplit fileSplit = (FileSplit)split; // should work
+    reporter.setStatus(fileSplit.toString());
+    try {
+      List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
+          ? null : ColumnProjectionUtils.getReadColumnIDs(job);
+      if (includedCols.isEmpty()) {
+        includedCols = null; // Also means read all columns? WTF?
+      }
+      Reader reader = reqFactory.createLocalRequest()
+          .setSplit(split)
+          .setSarg(SearchArgumentFactory.createFromConf(job))
+          .setColumns(includedCols).submit();
+      // TODO: presumably, we'll also pass the means to create original RecordReader
+      //       for failover to LlapRecordReader somewhere around here. This will
+      //       actually be quite complex because we'd somehow have to track what parts
+      //       of file Llap has already returned, and skip these in fallback reader.
+      // We are actually returning a completely wrong thing here wrt template parameters.
+      // This is how vectorization does it presently; we hope the caller knows what it is doing.
+      return createRecordReaderUnsafe(job, fileSplit, reader);
+    } catch (Exception ex) {
+      LOG.error("Local request failed; falling back to original", ex);
+      throw new IOException(ex); // just rethrow for now, for clarity
+      // return realInputFormat.getRecordReader(split, job, reporter);
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private RecordReader<NullWritable, T> createRecordReaderUnsafe(
+      JobConf job, FileSplit fileSplit, Reader reader) {
+    return (RecordReader)new VectorizedLlapRecordReader(reader, job, fileSplit);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return realInputFormat.getSplits(job, numSplits);
+  }
+
+  private static class VectorizedLlapRecordReader
+      implements RecordReader<NullWritable, VectorizedRowBatch> {
+    private final Reader reader;
+    private Vector currentVector;
+    private ColumnReader currentVectorSlice; // see VectorImpl, really just currentVector
+    private int currentVectorOffset = -1; // number of rows read from current vector
+
+    private VectorizedRowBatchCtx rbCtx;
+    private final VrbHelper vrbHelper = new VrbHelper();
+    private boolean addPartitionCols = true;
+
+    public VectorizedLlapRecordReader(Reader reader, JobConf job, FileSplit split) {
+      this.reader = reader;
+      try {
+        rbCtx = new VectorizedRowBatchCtx();
+        rbCtx.init(job, split);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+      try {
+        assert value != null;
+        if (currentVector == null) {
+          currentVector = reader.next();
+          if (currentVector == null) {
+            return false;
+          }
+          currentVectorOffset = 0;
+        }
+        // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+        if (addPartitionCols) {
+          rbCtx.addPartitionColsToBatch(value);
+          addPartitionCols = false;
+        }
+        populateBatchFromVector(value);
+        traceLogFirstRow(value);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
+      return true;
+    }
+
+    private void traceLogFirstRow(VectorizedRowBatch value) {
+      if (!LOG.isTraceEnabled()) return;
+      String tmp = "First row is [";
+      for (ColumnVector v : value.cols) {
+        if (v instanceof LongColumnVector) {
+          tmp += ((LongColumnVector)v).vector[0] + ", ";
+        } else if (v instanceof DoubleColumnVector) {
+          tmp += ((DoubleColumnVector)v).vector[0] + ", ";
+        } else if (v == null) {
+          tmp += "null, ";
+        } else {
+          tmp += "(something), ";
+        }
+      }
+      LOG.trace(tmp + "]");
+    }
+
+    private void populateBatchFromVector(VectorizedRowBatch target) throws IOException {
+      // TODO: eventually, when vectorized pipeline can work directly
+        //       on vectors instead of VRB, this will be a noop.
+      // TODO: track time spent building VRBs as opposed to processing.
+      int rowCount = VectorizedRowBatch.DEFAULT_SIZE,
+          rowsRemaining = currentVector.getNumberOfRows() - currentVectorOffset;
+      if (rowsRemaining <= rowCount) {
+        rowCount = rowsRemaining;
+      }
+      int[] columnMap = rbCtx.getIncludedColumnIndexes();
+      if (columnMap.length != currentVector.getNumberOfColumns()) {
+        throw new RuntimeException("Unexpected number of columns, VRB has " + columnMap.length
+            + " included, but vector has " + currentVector.getNumberOfColumns());
+      }
+      vrbHelper.prepare(rowCount);
+      // VRB was created from VrbCtx, so we already have pre-allocated column vectors
+      for (int vectorIx = 0; vectorIx < currentVector.getNumberOfColumns(); ++vectorIx) {
+        int colIx = columnMap[vectorIx];
+        currentVectorSlice = currentVector.next(colIx, rowCount);
+        target.cols[colIx].visit(vrbHelper);
+      }
+      target.selectedInUse = false;
+      target.size = rowCount;
+
+      if (rowsRemaining == rowCount) {
+        currentVector = null;
+        currentVectorOffset = -1;
+      } else {
+        currentVectorOffset += rowCount;
+      }
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public VectorizedRowBatch createValue() {
+      try {
+        return rbCtx.createVectorizedRowBatch();
+      } catch (HiveException e) {
+        throw new RuntimeException("Error creating a batch", e);
+      }
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      // TODO: plumb progress info thru the reader if we can get metadata from loader first.
+      return 0.0f;
+    }
+
+    /**
+     * There's one VrbHelper object per RecordReader object. We could just implement
+     * the visitor in RecordReader itself, but it's a bit cleaner like this.
+     */
+    private final class VrbHelper implements ColumnVectorVisitor {
+      private int rowCount = -1;
+      public void prepare(int rowCount) {
+        this.rowCount = rowCount;
+      }
+
+      @Override
+      public void visit(LongColumnVector c) throws IOException {
+        boolean hasNulls = currentVectorSlice.hasNulls();
+        boolean isSameValue = currentVectorSlice.isSameValue();
+        if (isSameValue) {
+          if (hasNulls) {
+            c.fillWithNulls();
+          } else {
+            c.fill(currentVectorSlice.getLong());
+          }
+        } else {
+          c.reset();
+          c.noNulls = !hasNulls;
+          currentVectorSlice.copyLongs(c.vector, hasNulls ? c.isNull : null, 0);
+          if (hasNulls) {
+            NullUtil.setNullDataEntriesLong(c, false, null, rowCount);
+          }
+        }
+      }
+
+
+      @Override
+      public void visit(DoubleColumnVector c) throws IOException {
+        boolean hasNulls = currentVectorSlice.hasNulls();
+        boolean isSameValue = currentVectorSlice.isSameValue();
+        if (isSameValue) {
+          if (hasNulls) {
+            c.fillWithNulls();
+          } else {
+            c.fill(currentVectorSlice.getDouble());
+          }
+        } else {
+          c.reset();
+          c.noNulls = !hasNulls;
+          currentVectorSlice.copyDoubles(c.vector, hasNulls ? c.isNull : null, 0);
+          if (hasNulls) {
+            NullUtil.setNullDataEntriesDouble(c, false, null, rowCount);
+          }
+        }
+      }
+
+      @Override
+      public void visit(DecimalColumnVector c) throws IOException {
+        boolean hasNulls = currentVectorSlice.hasNulls();
+        boolean isSameValue = currentVectorSlice.isSameValue();
+        if (isSameValue) {
+          if (hasNulls) {
+            c.fillWithNulls();
+          } else {
+            c.fill(currentVectorSlice.getDecimal());
+          }
+        } else {
+          c.reset();
+          c.noNulls = !hasNulls;
+          currentVectorSlice.copyDecimals(c.vector, hasNulls ? c.isNull : null, 0);
+          if (hasNulls) {
+            NullUtil.setNullDataEntriesDecimal(c, false, null, rowCount);
+          }
+        }
+      }
+
+      @Override
+      public void visit(BytesColumnVector c) throws IOException {
+        boolean hasNulls = currentVectorSlice.hasNulls();
+        boolean isSameValue = currentVectorSlice.isSameValue();
+        if (isSameValue) {
+          if (hasNulls) {
+            c.fillWithNulls();
+          } else {
+            c.fill(currentVectorSlice.getBytes());
+          }
+        } else {
+          c.reset();
+          c.noNulls = !hasNulls;
+          currentVectorSlice.copyBytes(c.vector, c.start, c.length, hasNulls ? c.isNull : null, 0);
+          if (hasNulls) {
+            NullUtil.setNullDataEntriesBytes(c, false, null, rowCount);
+          }
+        }
+      }
+    }
+  }
+}

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/** Marker interface to indicate a given input format can be wrapped by Llap. */
+public interface LlapWrappableInputFormatInterface {
+
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Sep 16 17:50:02 2014
@@ -20,10 +20,15 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 
 class BitFieldReader {
   private final RunLengthByteReader input;
+  /** The number of bits in one item. Non-test code always uses 1. */
   private final int bitSize;
   private int current;
   private int bitsLeft;
@@ -62,9 +67,112 @@ class BitFieldReader {
     return result & mask;
   }
 
-  void nextVector(LongColumnVector previous, long previousLen)
-      throws IOException {
+  /**
+   * Unlike integer readers, where runs are encoded explicitly, in this one we have to read ahead
+   * to figure out whether we have a run. Given that runs in booleans are likely it's worth it.
+   * However it means we'd need to keep track of how many bytes we read, and next/nextVector won't
+   * work anymore once this is called. These is trivial to fix, but these are never interspersed.
+   */
+  private boolean lastRunValue;
+  private int lastRunLength = -1;
+  private void readNextRun(int maxRunLength) throws IOException {
+    assert bitSize == 1;
+    if (lastRunLength > 0) return; // last run is not exhausted yet
+    if (bitsLeft == 0) {
+      readByte();
+    }
+    // First take care of the partial bits.
+    boolean hasVal = false;
+    int runLength = 0;
+    if (bitsLeft != 8) {
+      int partialBitsMask = (1 << bitsLeft) - 1;
+      int partialBits = current & partialBitsMask;
+      if (partialBits == partialBitsMask || partialBits == 0) {
+        lastRunValue = (partialBits == partialBitsMask);
+        if (maxRunLength <= bitsLeft) {
+          lastRunLength = maxRunLength;
+          return;
+        }
+        maxRunLength -= bitsLeft;
+        hasVal = true;
+        runLength = bitsLeft;
+        bitsLeft = 0;
+      } else {
+        // There's no run in partial bits. Return whatever we have.
+        int prefixBitsCount = 32 - bitsLeft;
+        runLength = Integer.numberOfLeadingZeros(partialBits) - prefixBitsCount;
+        lastRunValue = (runLength > 0);
+        lastRunLength = Math.min(maxRunLength, lastRunValue ? runLength :
+          (Integer.numberOfLeadingZeros(~(partialBits | ~partialBitsMask)) - prefixBitsCount));
+        return;
+      }
+      assert bitsLeft == 0;
+      readByte();
+    }
+    if (!hasVal) {
+      lastRunValue = ((current >> 7) == 1);
+      hasVal = true;
+    }
+    // Read full bytes until the run ends.
+    assert bitsLeft == 8;
+    while (maxRunLength >= 8
+        && ((lastRunValue && (current == 0xff)) || (!lastRunValue && (current == 0)))) {
+      runLength += 8;
+      maxRunLength -= 8;
+      readByte();
+    }
+    if (maxRunLength > 0) {
+      int extraBits = Integer.numberOfLeadingZeros(
+          lastRunValue ? (~(current | ~255)) : current) - 24;
+      bitsLeft -= extraBits;
+      runLength += extraBits;
+    }
+    lastRunLength = runLength;
+  }
+
+  private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+  int nextChunk(
+      ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+    if (bitSize != 1) {
+      throw new AssertionError("Bit size should always be 1");
+    }
+    boolean mayHaveNulls = present != null;
+    NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+    int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+    if (rowsLeftToWrite == 0) {
+      return 0; // Cannot write any rows into this writer.
+    }
+    long originalRowsLeft = rowsLeftToRead;
+    // Start the big loop to read rows until we run out of either input or space.
+    while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+      int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+      readNextRun(rowsToTransfer);
+      presentHelper.availLength = lastRunLength;
+      if (mayHaveNulls) {
+        LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+        if (!presentHelper.isNullsRun) {
+          assert lastRunLength >= presentHelper.availLength;
+          lastRunLength -= presentHelper.availLength;
+        }
+      } else {
+        lastRunLength = 0;
+      }
+
+      assert rowsLeftToRead >= presentHelper.availLength;
+      if (presentHelper.isNullsRun) {
+        writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+      } else {
+        writer.writeRepeatedLongs(lastRunValue ? 1 : 0, presentHelper.availLength,
+            presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+      }
+      rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+      rowsLeftToRead -= presentHelper.availLength;
+    } // End of big loop.
+    writer.finishCurrentSegment();
+    return (int)(originalRowsLeft - rowsLeftToRead);
+  }
 
+  void nextVector(LongColumnVector previous, long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
@@ -117,4 +225,30 @@ class BitFieldReader {
     return "bit reader current: " + current + " bits left: " + bitsLeft +
         " bit size: " + bitSize + " from " + input;
   }
+
+  boolean hasFullByte() {
+    return bitsLeft == 8 || bitsLeft == 0;
+  }
+
+  int peekOneBit() throws IOException {
+    assert bitSize == 1;
+    if (bitsLeft == 0) {
+      readByte();
+    }
+    return (current >>> (bitsLeft - 1)) & 1;
+  }
+
+  int peekFullByte() throws IOException {
+    assert bitSize == 1;
+    assert bitsLeft == 8 || bitsLeft == 0;
+    if (bitsLeft == 0) {
+      readByte();
+    }
+    return current;
+  }
+
+  void skipInCurrentByte(int bits) throws IOException {
+    assert bitsLeft >= bits;
+    bitsLeft -= bits;
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Sep 16 17:50:02 2014
@@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 
 /**
  * A tool for printing out the file structure of ORC files.

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
@@ -61,4 +63,6 @@ interface IntegerReader {
    */
    void nextVector(LongColumnVector previous, long previousLen)
       throws IOException;
+
+   int nextChunk(ChunkWriter writer, BitFieldReader present, long rowsLeft) throws IOException;
 }

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LlapUtils.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.BitFieldReader;
+
+public class LlapUtils {
+  public static final Log LOG = LogFactory.getLog(LlapUtils.class);
+  public static final int DOUBLE_GROUP_SIZE = 64; // just happens to be equal to bitmask size
+
+  /** Helper for readPresentStream. */
+  public static class PresentStreamReadResult {
+    public int availLength;
+    public boolean isNullsRun = false;
+    public boolean isFollowedByOther = false;
+    public void reset() {
+      isFollowedByOther = isNullsRun = false;
+    }
+  }
+
+  /**
+   * Helper method that reads present stream to find the size of the run of nulls, or a
+   * count of contiguous of non-null values based on the run length from the main stream.
+   * @param r Result is returned via this because java is not a real language.
+   * @param present The present stream.
+   * @param availLength The run length from the main stream.
+   * @param rowsLeftToRead Total number of rows that may be read from the main stream.
+   */
+  public static void readPresentStream(PresentStreamReadResult r,
+      BitFieldReader present, long rowsLeftToRead) throws IOException {
+    int presentBitsRead = 0;
+    r.reset();
+    // We are looking for a run of nulls no longer than rows to rowsLeftToRead (presumes
+    // they will all fit in the writer), OR a run of non-nulls no longer than availLength.
+
+    // If there's a partial byte in present stream, we will read bits from it.
+    boolean doneWithPresent = false;
+    int rowLimit = r.availLength;
+    while (!present.hasFullByte() && !doneWithPresent
+        && (presentBitsRead == 0 || (presentBitsRead < rowLimit))) {
+      int bit = present.peekOneBit();
+      if (presentBitsRead == 0) {
+        r.isNullsRun = (bit == 0);
+        rowLimit = (int)(r.isNullsRun ? rowsLeftToRead : r.availLength);
+      } else if (r.isNullsRun != (bit == 0)) {
+        doneWithPresent = r.isFollowedByOther = true;
+        break;
+      }
+      present.skipInCurrentByte(1);
+      ++presentBitsRead;
+    }
+    // Now, if we are not done, read the full bytes.
+    // TODO: we could ask the underlying byte stream of "present" reader for runs;
+    //       many bitmasks might have long sequences of 0x00 or 0xff.
+    while (!doneWithPresent && (presentBitsRead == 0 || (presentBitsRead < rowLimit))) {
+      int bits = present.peekFullByte();
+      if (presentBitsRead == 0) {
+        r.isNullsRun = (bits & (1 << 7)) == 0;
+        rowLimit = (int)(r.isNullsRun ? rowsLeftToRead : r.availLength);
+      }
+      int bitsToTake = 0;
+      if ((bits == 0 && r.isNullsRun) || (bits == 0xff && !r.isNullsRun)) {
+        bitsToTake = 8;
+      } else {
+        doneWithPresent = r.isFollowedByOther = true;
+        // Get the number of leading 0s or 1s in this byte.
+        bitsToTake = Integer.numberOfLeadingZeros(r.isNullsRun ? bits : (~(bits | ~255))) - 24;
+      }
+      bitsToTake = Math.min(bitsToTake, rowLimit - presentBitsRead);
+      presentBitsRead += bitsToTake;
+      present.skipInCurrentByte(bitsToTake);
+    } // End of the loop reading full bytes.
+    assert presentBitsRead <= rowLimit :  "Read " + presentBitsRead + " bits for "
+    + (r.isNullsRun ? "" : "non-") + "null run: " + rowsLeftToRead + ", " + r.availLength;
+    assert presentBitsRead > 0;
+    r.availLength = presentBitsRead;
+  }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Sep 16 17:50:02 2014
@@ -42,11 +42,15 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -99,7 +103,7 @@ import com.google.common.util.concurrent
  */
 public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct> {
+    AcidInputFormat<NullWritable, OrcStruct>, LlapWrappableInputFormatInterface {
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
   static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -107,7 +111,6 @@ public class OrcInputFormat  implements 
       SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE");
   static final String MAX_SPLIT_SIZE =
       SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
-  static final String SARG_PUSHDOWN = "sarg.pushdown";
 
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
   private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -210,7 +213,7 @@ public class OrcInputFormat  implements 
     boolean isOriginal =
         !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
     List<OrcProto.Type> types = file.getTypes();
-    setIncludedColumns(options, types, conf, isOriginal);
+    options.include(genIncludedColumns(types, conf, isOriginal));
     setSearchArgument(options, types, conf, isOriginal);
     return file.rowsOptions(options);
   }
@@ -234,6 +237,21 @@ public class OrcInputFormat  implements 
     }
   }
 
+  public static boolean[] genIncludedColumns(
+      List<OrcProto.Type> types, List<Integer> included, boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    int numColumns = types.size() - rootColumn;
+    boolean[] result = new boolean[numColumns];
+    result[0] = true;
+    OrcProto.Type root = types.get(rootColumn);
+    for(int i=0; i < root.getSubtypesCount(); ++i) {
+      if (included.contains(i)) {
+        includeColumnRecursive(types, result, root.getSubtypes(i),
+            rootColumn);
+      }
+    }
+    return result;
+  }
   /**
    * Take the configuration and figure out which columns we need to include.
    * @param options the options to update
@@ -241,26 +259,13 @@ public class OrcInputFormat  implements 
    * @param conf the configuration
    * @param isOriginal is the file in the original format?
    */
-  static void setIncludedColumns(Reader.Options options,
-                                 List<OrcProto.Type> types,
-                                 Configuration conf,
-                                 boolean isOriginal) {
-    int rootColumn = getRootColumn(isOriginal);
-    if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
-      int numColumns = types.size() - rootColumn;
-      boolean[] result = new boolean[numColumns];
-      result[0] = true;
-      OrcProto.Type root = types.get(rootColumn);
+  public static boolean[] genIncludedColumns(
+      List<OrcProto.Type> types, Configuration conf, boolean isOriginal) {
+     if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
-      for(int i=0; i < root.getSubtypesCount(); ++i) {
-        if (included.contains(i)) {
-          includeColumnRecursive(types, result, root.getSubtypes(i),
-              rootColumn);
-        }
-      }
-      options.include(result);
+      return genIncludedColumns(types, included, isOriginal);
     } else {
-      options.include(null);
+      return null;
     }
   }
 
@@ -268,37 +273,33 @@ public class OrcInputFormat  implements 
                                 List<OrcProto.Type> types,
                                 Configuration conf,
                                 boolean isOriginal) {
-    int rootColumn = getRootColumn(isOriginal);
-    String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-    String sargPushdown = conf.get(SARG_PUSHDOWN);
-    String columnNamesString =
-        conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
-    if ((sargPushdown == null && serializedPushdown == null)
-        || columnNamesString == null) {
+    String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (columnNamesString == null) {
+      LOG.debug("No ORC pushdown predicate - no column names");
+      options.searchArgument(null, null);
+      return;
+    }
+    SearchArgument sarg = SearchArgumentFactory.createFromConf(conf);
+    if (sarg == null) {
       LOG.debug("No ORC pushdown predicate");
       options.searchArgument(null, null);
-    } else {
-      SearchArgument sarg;
-      if (serializedPushdown != null) {
-        sarg = SearchArgumentFactory.create
-            (Utilities.deserializeExpression(serializedPushdown));
-      } else {
-        sarg = SearchArgumentFactory.create(sargPushdown);
-      }
-      LOG.info("ORC pushdown predicate: " + sarg);
-      String[] neededColumnNames = columnNamesString.split(",");
-      String[] columnNames = new String[types.size() - rootColumn];
-      boolean[] includedColumns = options.getInclude();
-      int i = 0;
-      for(int columnId: types.get(rootColumn).getSubtypesList()) {
-        if (includedColumns == null || includedColumns[columnId - rootColumn]) {
-          // this is guaranteed to be positive because types only have children
-          // ids greater than their own id.
-          columnNames[columnId - rootColumn] = neededColumnNames[i++];
-        }
+      return;
+    }
+
+    LOG.info("ORC pushdown predicate: " + sarg);
+    int rootColumn = getRootColumn(isOriginal);
+    String[] neededColumnNames = columnNamesString.split(",");
+    String[] columnNames = new String[types.size() - rootColumn];
+    boolean[] includedColumns = options.getInclude();
+    int i = 0;
+    for(int columnId: types.get(rootColumn).getSubtypesList()) {
+      if (includedColumns == null || includedColumns[columnId - rootColumn]) {
+        // this is guaranteed to be positive because types only have children
+        // ids greater than their own id.
+        columnNames[columnId - rootColumn] = neededColumnNames[i++];
       }
-      options.searchArgument(sarg, columnNames);
     }
+    options.searchArgument(sarg, columnNames);
   }
 
   @Override
@@ -752,7 +753,7 @@ public class OrcInputFormat  implements 
         // deltas may change the rows making them match the predicate.
         if (deltas.isEmpty()) {
           Reader.Options options = new Reader.Options();
-          setIncludedColumns(options, types, context.conf, isOriginal);
+          options.include(genIncludedColumns(types, context.conf, isOriginal));
           setSearchArgument(options, types, context.conf, isOriginal);
           if (options.getSearchArgument() != null) {
             SearchArgument sarg = options.getSearchArgument();
@@ -1097,7 +1098,7 @@ public class OrcInputFormat  implements 
           .getBucket();
       reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
       final List<OrcProto.Type> types = reader.getTypes();
-      setIncludedColumns(readOptions, types, conf, split.isOriginal());
+      readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
       setSearchArgument(readOptions, types, conf, split.isOriginal());
     } else {
       bucket = (int) split.getStart();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Sep 16 17:50:02 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;