You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:10 UTC

[2/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
new file mode 100644
index 0000000..a70545e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -0,0 +1,1248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.TextStripeMetadata;
+import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcFile.EncodingStrategy;
+import org.apache.orc.OrcFile.Version;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.OutStream.OutputReceiver;
+import org.apache.orc.impl.PhysicalWriter;
+import org.apache.orc.impl.StreamName;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class SerDeEncodedDataReader extends CallableWithNdc<Void>
+    implements ConsumerFeedback<OrcEncodedColumnBatch> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SerDeEncodedDataReader.class);
+  public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+        @Override
+        public ColumnStreamData create() {
+          return new ColumnStreamData();
+        }
+        @Override
+        public void resetBeforeOffer(ColumnStreamData t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+        @Override
+        public OrcEncodedColumnBatch create() {
+          return new OrcEncodedColumnBatch();
+        }
+        @Override
+        public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+      @Override
+      public CacheChunk create() {
+        return new CacheChunk();
+      }
+      @Override
+      public void resetBeforeOffer(CacheChunk t) {
+        t.reset();
+      }
+    });
+  private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+    @Override
+    public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+      CacheChunk tcc = TCC_POOL.take();
+      tcc.init(buffer, offset, end);
+      return tcc;
+    }
+  };
+
+
+  private final SerDeLowLevelCacheImpl cache;
+  private final BufferUsageManager bufferManager;
+  private final Configuration conf;
+  private final FileSplit split;
+  private List<Integer> columnIds;
+  private final OrcEncodedDataConsumer consumer;
+  private final QueryFragmentCounters counters;
+  private final UserGroupInformation ugi;
+
+  private final Object fileKey;
+  private final FileSystem fs;
+
+  private volatile boolean isStopped = false;
+  private final Deserializer sourceSerDe;
+  private final InputFormat<?, ?> sourceInputFormat;
+  private final Reporter reporter;
+  private final JobConf jobConf;
+  private final int allocSize;
+  private final int targetSliceRowCount;
+
+  private final boolean[] writerIncludes;
+  private WriterImpl orcWriter = null;
+  private CacheWriter cacheWriter = null;
+  /**
+   * Data from cache currently being processed. We store it here so that we could decref
+   * it in case of failures. We remove each slice from the data after it has been sent to
+   * the consumer, at which point the consumer is responsible for it.
+   */
+  private FileData cachedData;
+
+  public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
+      BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
+      List<Integer> columnIds, OrcEncodedDataConsumer consumer,
+      JobConf jobConf, Reporter reporter, InputFormat<?, ?> sourceInputFormat,
+      Deserializer sourceSerDe, QueryFragmentCounters counters, TypeDescription schema)
+          throws IOException {
+    this.cache = cache;
+    this.bufferManager = bufferManager;
+    this.conf = daemonConf;
+    this.split = split;
+    this.columnIds = columnIds;
+    this.allocSize = determineAllocSize(bufferManager, daemonConf);
+    boolean isInTest = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_IN_TEST);
+    this.targetSliceRowCount = HiveConf.getIntVar(
+        isInTest ? jobConf : daemonConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT);
+    LOG.info("TODO# targetSliceRowCount = " + targetSliceRowCount);
+    if (this.columnIds != null) {
+      Collections.sort(this.columnIds);
+    }
+    this.consumer = consumer;
+    this.counters = counters;
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    fs = split.getPath().getFileSystem(daemonConf);
+    fileKey = determineFileId(fs, split,
+        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+    this.sourceInputFormat = sourceInputFormat;
+    this.sourceSerDe = sourceSerDe;
+    this.reporter = reporter;
+    this.jobConf = jobConf;
+    this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
+  }
+
+  private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
+    long allocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_ENCODE_ALLOC_SIZE);
+    int maxAllocSize = bufferManager.getAllocator().getMaxAllocation();
+    if (allocSize > maxAllocSize) {
+      LOG.error("Encode allocation size " + allocSize + " is being capped to the maximum "
+          + "allocation size " + bufferManager.getAllocator().getMaxAllocation());
+      allocSize = maxAllocSize;
+    }
+    return (int)allocSize;
+  }
+
+  @Override
+  public void stop() {
+    LOG.debug("Encoded reader is being stopped");
+    isStopped = true;
+  }
+
+  @Override
+  public void pause() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void unpause() {
+    throw new UnsupportedOperationException();
+  }
+
+  // TODO: move to base class?
+  @Override
+  protected Void callInternal() throws IOException, InterruptedException {
+    return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        return performDataRead();
+      }
+    });
+  }
+
+  private static final class LineRrOffsetReader extends PassThruOffsetReader {
+    private static final Method isCompressedMethod;
+    private final LineRecordReader lrReader;
+    private final LongWritable posKey;
+
+    static {
+      Method isCompressedMethodTmp;
+      try {
+        isCompressedMethodTmp = LineRecordReader.class.getDeclaredMethod("isCompressedInput");
+        isCompressedMethodTmp.setAccessible(true);
+      } catch (Throwable t) {
+        isCompressedMethodTmp = null;
+        LOG.info("TODO# cannot get LineRecordReader isCompressedInput method", t);
+      }
+      isCompressedMethod = isCompressedMethodTmp;
+    }
+
+    static ReaderWithOffsets create(LineRecordReader sourceReader) {
+      if (isCompressedMethod == null) return null;
+      Boolean isCompressed = null;
+      try {
+        isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
+      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+        LOG.info("TODO# cannot check the reader for compression", e);
+        return new PassThruOffsetReader(sourceReader);
+      }
+      if (isCompressed) return null; // Cannot slice compressed files.
+      return new LineRrOffsetReader(sourceReader);
+    }
+
+    private LineRrOffsetReader(LineRecordReader sourceReader) {
+      super(sourceReader);
+      this.lrReader = sourceReader;
+      this.posKey = (LongWritable)key;
+    }
+
+    @Override
+    public long getCurrentRowStartOffset() {
+      return posKey.get();
+    }
+
+    @Override
+    public long getCurrentFileOffset() {
+      try {
+        return lrReader.getPos();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  private static class PassThruOffsetReader implements ReaderWithOffsets {
+    protected final RecordReader sourceReader;
+    protected final Object key;
+    protected final Writable value;
+
+    private PassThruOffsetReader(RecordReader sourceReader) {
+      this.sourceReader = sourceReader;
+      key = sourceReader.createKey();
+      value = (Writable)sourceReader.createValue();
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return sourceReader.next(key, value);
+    }
+
+    @Override
+    public Writable getCurrentValue() {
+      return value;
+    }
+
+    @Override
+    public void close() throws IOException {
+      sourceReader.close();
+    }
+
+    @Override
+    public long getCurrentRowStartOffset() {
+      return -1;
+    }
+
+    @Override
+    public long getCurrentFileOffset() {
+      return -1;
+    }
+  }
+
+  public static class CacheOutStream extends OutStream {
+    private final CacheOutputReceiver receiver;
+    public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
+        CacheOutputReceiver receiver) throws IOException {
+      super(name, bufferSize, codec, receiver);
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void clear() throws IOException {
+      super.clear();
+      receiver.clear();
+    }
+  }
+
+  private interface ReaderWithOffsets {
+    boolean next() throws IOException;
+    Writable getCurrentValue();
+    long getCurrentRowStartOffset();
+    void close() throws IOException;
+    long getCurrentFileOffset();
+  }
+
+  public static class CacheWriter implements PhysicalWriter {
+    // Struct.
+    private static class CacheStreamData {
+      private final List<MemoryBuffer> data;
+      private final boolean isSuppressed;
+      private final StreamName name;
+      public CacheStreamData(boolean isSuppressed, StreamName name, List<MemoryBuffer> data) {
+        this.isSuppressed = isSuppressed;
+        this.name = name;
+        this.data = data;
+      }
+      @Override
+      public String toString() {
+        return "CacheStreamData [name=" + name + ", isSuppressed="
+            + isSuppressed + ", data=" + toString(data) + "]";
+      }
+      private static String toString(List<MemoryBuffer> data) {
+        String s = "";
+        for (MemoryBuffer buffer : data) {
+          s += LlapDataBuffer.toDataString(buffer) + ", ";
+        }
+        return s;
+      }
+    }
+
+    private static class CacheStripeData {
+      private List<ColumnEncoding> encodings;
+      private long rowCount = -1;
+      private long knownTornStart, firstRowStart, lastRowStart, lastRowEnd;
+      private Map<Integer, List<CacheStreamData>> colStreams = new HashMap<>();
+      @Override
+      public String toString() {
+        return ("{disk data knownTornStart=" + knownTornStart
+            + ", firstRowStart=" + firstRowStart + ", lastRowStart="
+            + lastRowStart + ", lastRowEnd=" + lastRowEnd + ", rowCount=" + rowCount
+            + ", encodings=" + encodings + ", streams=" + colStreams + "}").replace('\n', ' ');
+      }
+
+      public String toCoordinateString() {
+        return "knownTornStart=" + knownTornStart + ", firstRowStart=" + firstRowStart
+            + ", lastRowStart=" + lastRowStart + ", lastRowEnd=" + lastRowEnd;
+      }
+    }
+
+    private CacheStripeData currentStripe;
+    private final List<CacheStripeData> stripes = new ArrayList<>();
+    private final BufferUsageManager bufferManager;
+    private final int bufferSize;
+    private final List<Integer> columnIds;
+    private final boolean[] writerIncludes;
+    // These are global since ORC reuses objects between stripes.
+    private final Map<StreamName, OutStream> streams = new HashMap<>();
+    private final Map<Integer, List<CacheOutStream>> colStreams = new HashMap<>();
+
+    public CacheWriter(BufferUsageManager bufferManager, int bufferSize, List<Integer> columnIds,
+        boolean[] writerIncludes) {
+      this.bufferManager = bufferManager;
+      this.bufferSize = bufferSize;
+      this.columnIds = columnIds;
+      this.writerIncludes = writerIncludes;
+      startStripe();
+    }
+
+    private void startStripe() {
+      if (currentStripe != null) {
+        stripes.add(currentStripe);
+      }
+      currentStripe = new CacheStripeData();
+    }
+
+    @Override
+    public void initialize() throws IOException {
+    }
+
+    @Override
+    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+      OrcProto.Metadata metadata = builder.build();
+      // LOG.info("TODO# Processing file metadata " + metadata);
+    }
+
+    @Override
+    public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+      OrcProto.Footer footer = builder.build();
+      // LOG.info("TODO# Processing file footer " + footer);
+      validateIncludes(footer);
+    }
+
+    public void validateIncludes(OrcProto.Footer footer) throws IOException {
+      boolean[] translatedIncludes = columnIds == null ? null : OrcInputFormat.genIncludedColumns(
+          OrcUtils.convertTypeFromProtobuf(footer.getTypesList(), 0), columnIds);
+      if (translatedIncludes == null && writerIncludes == null) return;
+      if (translatedIncludes == null || writerIncludes == null) {
+        throwIncludesMismatchError(translatedIncludes);
+      }
+      int len = Math.min(translatedIncludes.length, writerIncludes.length);
+      for (int i = 0; i < len; ++i) {
+        // Translated includes may be a superset of writer includes due to cache.
+        if (!translatedIncludes[i] && writerIncludes[i]) {
+          throwIncludesMismatchError(translatedIncludes);
+        }
+      }
+      if (translatedIncludes.length < writerIncludes.length) {
+        for (int i = len; i < writerIncludes.length; ++i) {
+          if (writerIncludes[i]) {
+            throwIncludesMismatchError(translatedIncludes);
+          }
+        }
+      }
+
+    }
+
+    private String throwIncludesMismatchError(boolean[] translated) throws IOException {
+      String s = "Includes derived from the original table: " + DebugUtils.toString(writerIncludes)
+          + " but the ones derived from writer types are: " + DebugUtils.toString(translated);
+      LOG.error(s);
+      throw new IOException(s);
+    }
+
+    @Override
+    public void writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+      // LOG.info("TODO# Ignoring post script " + builder.build());
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Closed from ORC writer, we still need the data. Do not discard anything.
+    }
+
+    public void discardData() {
+      LOG.info("TODO# discarding disk data (if any wasn't cached)");
+      for (CacheStripeData stripe : stripes) {
+        if (stripe.colStreams == null || stripe.colStreams.isEmpty()) continue;
+        for (List<CacheStreamData> streams : stripe.colStreams.values()) {
+          for (CacheStreamData cos : streams) {
+            for (MemoryBuffer buffer : cos.data) {
+              bufferManager.getAllocator().deallocate(buffer);
+            }
+          }
+        }
+        stripe.colStreams.clear();
+      }
+    }
+
+    @Override
+    public long getPhysicalStripeSize() {
+      return 0; // Always 0, no memory checks.
+    }
+
+    @Override
+    public boolean isCompressed() {
+      return false;
+    }
+
+    @Override
+    public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+      OutStream os = streams.get(name);
+      if (os != null) return os;
+      if (isNeeded(name)) {
+        LOG.info("Creating cache receiver for " + name);
+        CacheOutputReceiver or = new CacheOutputReceiver(bufferManager, name);
+        CacheOutStream cos = new CacheOutStream(name.toString(), bufferSize, null, or);
+        os = cos;
+        List<CacheOutStream> list = colStreams.get(name.getColumn());
+        if (list == null) {
+          list = new ArrayList<>();
+          colStreams.put(name.getColumn(), list);
+        }
+        list.add(cos);
+      } else {
+        LOG.info("Creating null receiver for " + name);
+        OutputReceiver or = new NullOutputReceiver(name);
+        os = new OutStream(name.toString(), bufferSize, null, or);
+      }
+      streams.put(name, os);
+      return os;
+    }
+
+    @Override
+    public void finalizeStripe(
+        OrcProto.StripeFooter.Builder footer,
+        OrcProto.StripeInformation.Builder dirEntry)
+        throws IOException {
+      List<ColumnEncoding> allEnc = footer.getColumnsList();
+      OrcProto.StripeInformation si = dirEntry.build();
+      LOG.info(("TODO# Finalizing stripe " + footer.build() + " => " + si).replace('\n', ' '));
+      currentStripe.encodings = new ArrayList<>(allEnc);
+      for (int i = 0; i < currentStripe.encodings.size(); ++i) {
+        // Don't record encodings for unneeded columns.
+        if (writerIncludes == null || writerIncludes[i]) continue;
+        currentStripe.encodings.set(i, null);
+      }
+      currentStripe.rowCount = si.getNumberOfRows();
+      // ORC writer reuses streams, so we need to clean them here and extract data.
+      for (Map.Entry<Integer, List<CacheOutStream>> e : colStreams.entrySet()) {
+        int colIx = e.getKey();
+        List<CacheOutStream> streams = e.getValue();
+        List<CacheStreamData> data = new ArrayList<>(streams.size());
+        for (CacheOutStream stream : streams) {
+          stream.flush();
+          List<MemoryBuffer> buffers = stream.receiver.buffers;
+          if (buffers == null) {
+            LOG.info("TODO# buffers are null for " + stream.receiver.name);
+          }
+          data.add(new CacheStreamData(stream.isSuppressed(), stream.receiver.name,
+              buffers == null ? new ArrayList<MemoryBuffer>() : new ArrayList<>(buffers)));
+          stream.clear();
+        }
+        currentStripe.colStreams.put(colIx, data);
+      }
+      startStripe();
+    }
+
+    @Override
+    public long estimateMemory() {
+      return 0; // We never ever use any memory.
+    }
+
+    @Override
+    public void writeIndexStream(StreamName name,
+        OrcProto.RowIndex.Builder rowIndex) throws IOException {
+      if (isNeeded(name)) {
+        // LOG.info("TODO# Saving index " + name);
+        // currentStripe.indexes.put(name.getColumn(), rowIndex.build());
+      } else {
+        // LOG.info("TODO# Ignoring index " + name + " => " + rowIndex);
+      }
+    }
+
+    private boolean isNeeded(StreamName name) {
+      return writerIncludes == null || writerIncludes[name.getColumn()];
+    }
+
+    @Override
+    public void writeBloomFilterStream(StreamName streamName,
+        OrcProto.BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+      // LOG.info("TODO# Ignoring bloom filter " + streamName + " => " + bloomFilterIndex);
+    }
+
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    @Override
+    public long getRawWriterPosition() throws IOException {
+      return -1; // Meaningless for this writer.
+    }
+
+    @Override
+    public void appendRawStripe(byte[] stripe, int offset, int length,
+        OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+      throw new UnsupportedOperationException(); // Only used in ACID writer.
+    }
+
+    public void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long currentFileOffset) {
+      currentStripe.knownTornStart = currentKnownTornStart;
+      currentStripe.firstRowStart = firstStartOffset;
+      currentStripe.lastRowStart = lastStartOffset;
+      currentStripe.lastRowEnd = currentFileOffset;
+    }
+  }
+
+  private interface CacheOutput {
+    List<MemoryBuffer> getData();
+    StreamName getName();
+  }
+
+  private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
+    private final BufferUsageManager bufferManager;
+    private final StreamName name;
+    private List<MemoryBuffer> buffers = null;
+    private int lastBufferPos = -1;
+
+    public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) {
+      this.bufferManager = bufferManager;
+      this.name = name;
+    }
+
+    public void clear() {
+      buffers = null;
+      lastBufferPos = -1;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      // TODO: avoid put() by working directly in OutStream?
+      LOG.info(name + " receiving a buffer of size " + buffer.remaining());
+      int size = buffer.remaining();
+      ByteBuffer bb = null;
+      if (buffers == null) {
+        buffers = new ArrayList<>();
+      }
+      if (!buffers.isEmpty()) {
+        MemoryBuffer lastBuffer = buffers.get(buffers.size() - 1);
+        bb = lastBuffer.getByteBufferRaw();
+        int written = lastBufferPos - bb.position();
+        if (bb.remaining() - written < size) {
+          lastBufferPos = -1;
+          bb = null;
+        }
+      }
+      boolean isNewBuffer = (lastBufferPos == -1);
+      if (isNewBuffer) {
+        MemoryBuffer[] dest = new MemoryBuffer[1];
+        bufferManager.getAllocator().allocateMultiple(dest, size);
+        LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+        bb = newBuffer.getByteBufferRaw();
+        lastBufferPos = bb.position();
+        buffers.add(newBuffer);
+      }
+      // Since there's no close() here, maintain the initial read position between writes.
+      int pos = bb.position();
+      bb.position(lastBufferPos);
+      bb.put(buffer);
+      lastBufferPos = bb.position();
+      bb.position(pos);
+    }
+
+    @Override
+    public List<MemoryBuffer> getData() {
+      return buffers;
+    }
+
+    @Override
+    public StreamName getName() {
+      return name;
+    }
+  }
+
+  private static class NullOutputReceiver implements OutputReceiver {
+    private final StreamName name;
+
+    public NullOutputReceiver(StreamName name) {
+      this.name = name;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      LOG.info(name + " discarding a buffer of size " + buffer.remaining());
+    }
+  }
+
+  protected Void performDataRead() throws IOException {
+    try {
+      long startTime = counters.startTimeCounter();
+      LlapIoImpl.LOG.info("Processing data for {}", split.getPath());
+      if (processStop()) {
+        recordReaderTime(startTime);
+        return null;
+      }
+
+      Boolean isFromCache = null;
+      try {
+        isFromCache = readFileWithCache(startTime);
+      } finally {
+        if (cachedData != null && cachedData.getData() != null) {
+          for (StripeData sd : cachedData.getData()) {
+            unlockAllBuffers(sd);
+          }
+        }
+      }
+      if (isFromCache == null) return null; // Stop requested, and handled inside.
+      if (!isFromCache) {
+        if (!processOneFileSplit(split, startTime, Ref.from(0), null)) return null;
+      }
+
+      // Done with all the things.
+      recordReaderTime(startTime);
+      LOG.info("TODO# calling setDone");
+      consumer.setDone();
+
+      LlapIoImpl.LOG.trace("done processing {}", split);
+      return null;
+    } catch (Throwable e) {
+      LOG.error("TODO# threw", e);
+      consumer.setError(e);
+      throw e;
+    } finally {
+      cleanupReaders();
+    }
+  }
+
+  private void unlockAllBuffers(StripeData si) {
+    for (int i = 0; i < si.getData().length; ++i) {
+      LlapDataBuffer[][] colData = si.getData()[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          bufferManager.decRefBuffer(streamData[k]);
+        }
+      }
+    }
+  }
+
+  public void cacheFileData(StripeData sd) {
+    if (sd == null || sd.getEncodings() == null) return;
+    if (fileKey != null) {
+      // Note that we cache each slice separately. We could cache them together at the end, but
+      // then we won't be able to pass them to users without inc-refing explicitly.
+      ColumnEncoding[] encodings = sd.getEncodings();
+      for (int i = 0; i < encodings.length; ++i) {
+        // Make data consistent with encodings, don't store useless information.
+        if (sd.getData()[i] == null) {
+          encodings[i] = null;
+        }
+      }
+      FileData fd = new FileData(fileKey, encodings.length);
+      fd.addStripe(sd);
+      cache.putFileData(fd, Priority.NORMAL, counters);
+    } else {
+      lockAllBuffers(sd);
+    }
+    // We assume that if put/lock throws in the middle, it's ok to treat buffers as not being
+    // locked and to blindly deallocate them, since they are not going to be used. Therefore
+    // we don't remove them from the cleanup list - we will do it after sending to consumer.
+    // This relies on sequence of calls to cacheFileData and sendEcb..
+  }
+
+
+  private void lockAllBuffers(StripeData sd) {
+    for (int i = 0; i < sd.getData().length; ++i) {
+      LlapDataBuffer[][] colData = sd.getData()[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          boolean canLock = bufferManager.incRefBuffer(streamData[k]);
+          assert canLock;
+        }
+      }
+    }
+  }
+
+  public Boolean readFileWithCache(long startTime) throws IOException {
+    if (fileKey == null) return false;
+    BooleanRef gotAllData = new BooleanRef();
+    long endOfSplit = split.getStart() + split.getLength();
+    this.cachedData = cache.getFileData(fileKey, split.getStart(),
+        endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+    if (cachedData == null) {
+      LOG.info("TODO# no data for the split found in cache");
+      return false;
+    }
+    String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
+    List<StripeData> slices = cachedData.getData();
+    if (slices.isEmpty()) return false;
+    long uncachedPrefixEnd = slices.get(0).getKnownTornStart(),
+        uncachedSuffixStart = slices.get(slices.size() - 1).getLastEnd();
+    Ref<Integer> stripeIx = Ref.from(0);
+    if (uncachedPrefixEnd > split.getStart()) {
+      // TODO: can we merge neighboring splits? So we don't init so many readers.
+      FileSplit sliceSplit = new FileSplit(split.getPath(), split.getStart(),
+          uncachedPrefixEnd - split.getStart(), hosts, inMemoryHosts);
+      if (!processOneFileSplit(sliceSplit, startTime, stripeIx, null)) return null;
+    }
+    while (!slices.isEmpty()) {
+      StripeData slice = slices.get(0);
+      long start = slice.getKnownTornStart();
+      long len = slice.getLastStart() - start; // Will also read the last row.
+      FileSplit sliceSplit = new FileSplit(split.getPath(), start, len, hosts, inMemoryHosts);
+      if (!processOneFileSplit(sliceSplit, startTime, stripeIx, slice)) return null;
+    }
+    boolean isUnfortunate = false;
+    if (uncachedSuffixStart == endOfSplit) {
+      // This is rather obscure. The end of last row cached is precisely at the split end offset.
+      // If the split is in the middle of the file, LRR would read one more row after that,
+      // therefore as unfortunate as it is, we have to do a one-row read. However, for that to
+      // have happened, someone should have supplied a split that ends inside the last row, i.e.
+      // a few bytes earlier than the current split, which is pretty unlikely. What is more likely
+      // is that the split, and the last row, both end at the end of file. Check for this.
+      long size =  split.getPath().getFileSystem(conf).getFileStatus(split.getPath()).getLen();
+      isUnfortunate = size > endOfSplit;
+      if (isUnfortunate) {
+        LOG.info("TODO# one-row mismatch at the end of split " + split.getPath() + " at "
+            + endOfSplit + "; file size is " + size);
+      }
+    }
+
+    if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
+      // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+      FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
+          endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
+      if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
+    }
+    return true;
+  }
+
+  public boolean processOneFileSplit(FileSplit split, long startTime,
+      Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+    LlapIoImpl.LOG.info("TODO# Processing one split {" + split.getPath() + ", "
+        + split.getStart() + ", " + split.getLength() + "}; cache data " + slice);
+    boolean[] splitIncludes = writerIncludes;
+    boolean hasAllData = false;
+    if (cacheEncodings != null) {
+      hasAllData = true;
+      splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+      for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+        if (!splitIncludes[colIx]) continue;
+        assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
+        if (cacheEncodings[colIx] != null) {
+          splitIncludes[colIx] = false;
+        } else {
+          hasAllData = false;
+        }
+      }
+    }
+    LOG.info("TODO# includes accounting for cached data: before " + DebugUtils.toString(
+        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
+
+    // We have 3 cases here:
+    // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
+    // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
+    // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
+    if (!hasAllData) {
+      // This initializes cacheWriter with data.
+      readSplitFromFile(split, splitIncludes, slice);
+      assert cacheWriter != null;
+    }
+    if (slice != null) {
+      // If we had a cache range already, it should not have been split.
+      assert cacheWriter == null || cacheWriter.stripes.size() == 1;
+      CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+      boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
+      ++stripeIxRef.value;
+      return result;
+    } else {
+      for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+        if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+          return false;
+        }
+        ++stripeIxRef.value;
+      }
+      return true;
+    }
+  }
+
+  private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
+      int stripeIx, StripeData slice, long startTime) throws IOException {
+    String sliceStr = slice == null ? "null" : slice.toCoordinateString();
+    LOG.info("TODO# processing slice #" + stripeIx + " " + sliceStr + "; has"
+        + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
+        + " disk data");
+
+    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+    LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
+    long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+    TextStripeMetadata metadata = new TextStripeMetadata(stripeIx);
+    StripeData sliceToCache = null;
+    boolean hasAllData = csd == null;
+    if (!hasAllData) {
+      if (slice == null) {
+        sliceToCache = new StripeData(
+            csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
+            csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
+      } else {
+        if (csd.rowCount != slice.getRowCount()) {
+          throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
+              + slice.getRowCount() + " from " + csd + " and " + slice);
+        }
+        if (csd.encodings.size() != slice.getEncodings().length) {
+          throw new IOException("Column count mismatch; disk " + csd.encodings.size()
+              + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
+        }
+        LOG.info("TODO# creating slice to cache in addition to an existing slice "
+            + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
+        sliceToCache = StripeData.duplicateForResults(slice);
+        for (int i = 0; i < csd.encodings.size(); ++i) {
+          sliceToCache.getEncodings()[i] = csd.encodings.get(i);
+        }
+        sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
+      }
+      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
+      metadata.setRowCount(csd.rowCount);
+    } else {
+      assert cacheWriter == null;
+      metadata.setEncodings(Lists.newArrayList(cacheEncodings));
+      metadata.setRowCount(cacheRowCount);
+    }
+    LOG.info("TODO# derived stripe metadata for this split is " + metadata);
+    consumer.setStripeMetadata(metadata);
+
+    OrcEncodedColumnBatch ecb = ECB_POOL.take();
+    ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+    for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+      if (!writerIncludes[colIx]) continue;
+      ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+      if (!hasAllData && splitIncludes[colIx]) {
+        // The column has been read from disk.
+        List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
+        LOG.info("TODO# processing streams for column " + colIx + ": " + streams);
+        LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+            = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+        if (streams == null) continue; // Struct column, such as root?
+        Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+        while (iter.hasNext()) {
+          CacheWriter.CacheStreamData stream = iter.next();
+          if (stream.isSuppressed) {
+            LOG.info("TODO# removing a suppressed stream " + stream.name);
+            iter.remove();
+            discardUncachedBuffers(stream.data);
+            continue;
+          }
+          // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+          ColumnStreamData cb = CSD_POOL.take();
+          cb.incRef();
+          int streamIx = stream.name.getKind().getNumber();
+          cb.setCacheBuffers(stream.data);
+          // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+          newCacheDataForCol[streamIx] = stream.data.toArray(
+              new LlapDataBuffer[stream.data.size()]);
+          ecb.setStreamData(colIx, streamIx, cb);
+        }
+      } else {
+        // The column has been obtained from cache.
+        LlapDataBuffer[][] colData = cacheData[colIx];
+        LOG.info("TODO# processing cache data for column " + colIx + ": " + SerDeLowLevelCacheImpl.toString(colData));
+        for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+          if (colData[streamIx] == null) continue;
+          ColumnStreamData cb = CSD_POOL.take();
+          cb.incRef();
+          cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+          ecb.setStreamData(colIx, streamIx, cb);
+        }
+      }
+    }
+    if (processStop()) {
+      recordReaderTime(startTime);
+      return false;
+    }
+    // Note: we cache slices one by one since we need to lock them before sending to consumer.
+    //       We could lock here, then cache them together, then unlock here and in return,
+    //       but for now just rely on the cache put to lock them before we send them over.
+    LOG.info("TODO# Data to cache from the read " + sliceToCache);
+    cacheFileData(sliceToCache);
+    return sendEcbToConsumer(ecb, slice != null, csd);
+  }
+
+  private void discardUncachedBuffers(List<MemoryBuffer> list) {
+    for (MemoryBuffer buffer : list) {
+      boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
+      assert isInvalidated;
+      bufferManager.getAllocator().deallocate(buffer);
+    }
+  }
+
+  private static List<ColumnEncoding> combineCacheAndWriterEncodings(
+      ColumnEncoding[] cacheEncodings, List<ColumnEncoding> writerEncodings) throws IOException {
+    // TODO: refactor with cache impl? it has the same merge logic
+    if (cacheEncodings == null) {
+      return new ArrayList<>(writerEncodings);
+    }
+    if (cacheEncodings.length != writerEncodings.size()) {
+      throw new IOException("Incompatible encoding lengths: "
+          + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+    }
+    ColumnEncoding[] combinedEncodings = Arrays.copyOf(cacheEncodings, cacheEncodings.length);
+    for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+      ColumnEncoding newEncoding = writerEncodings.get(colIx);
+      if (newEncoding == null) continue;
+      if (combinedEncodings[colIx] != null && !newEncoding.equals(combinedEncodings[colIx])) {
+        throw new IOException("Incompatible encodings at " + colIx + ": "
+            + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+      }
+      combinedEncodings[colIx] = newEncoding;
+    }
+    return Lists.newArrayList(combinedEncodings);
+  }
+
+
+  public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
+      throws IOException {
+    boolean maySplitTheSplit = slice == null;
+    ReaderWithOffsets offsetReader = null;
+    @SuppressWarnings("rawtypes")
+    RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+    try {
+      LOG.info("TODO# using " + sourceReader.getClass().getSimpleName() + " to read data");
+      // TODO# need a more general approach to this. At least, need to factor this out and add configs.
+      if (sourceReader instanceof LineRecordReader) {
+        offsetReader = LineRrOffsetReader.create((LineRecordReader)sourceReader);
+        maySplitTheSplit = maySplitTheSplit && offsetReader != null;
+        sourceReader = null;
+      } else {
+        offsetReader = new PassThruOffsetReader(sourceReader);
+        sourceReader = null;
+      }
+      ObjectInspector sourceOi;
+      try {
+        sourceOi = sourceSerDe.getObjectInspector();
+      } catch (SerDeException e) {
+        throw new IOException(e);
+      }
+
+      // TODO: ideally, we want to transform the rows to only have the included columns, and
+      //       only write those to the writer, with modified schema; then map back to full set later.
+      WriterOptions opts = OrcFile.writerOptions(conf)
+          .stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+          .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
+          .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
+          .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi);
+
+
+      // Column IDs are only used to verify eventual writer includes.
+      cacheWriter = new CacheWriter(bufferManager, allocSize, columnIds, splitIncludes);
+      orcWriter = new WriterImpl(cacheWriter, null, opts);
+      int rowsPerSlice = 0;
+      long currentKnownTornStart = split.getStart();
+      long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+      while (offsetReader.next()) {
+        Writable value = offsetReader.getCurrentValue();
+        lastStartOffset = offsetReader.getCurrentRowStartOffset();
+        if (firstStartOffset == Long.MIN_VALUE) {
+          firstStartOffset = lastStartOffset;
+        }
+        Object row = null;
+        try {
+          row = sourceSerDe.deserialize(value);
+        } catch (SerDeException e) {
+          throw new IOException(e);
+        }
+        orcWriter.addRow(row);
+        if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+          long fileOffset = offsetReader.getCurrentFileOffset();
+          // Must support offsets to be able to split.
+          if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+            throw new AssertionError("Unable to get offsets from "
+                + offsetReader.getClass().getSimpleName());
+          }
+          cacheWriter.setCurrentStripeOffsets(
+              currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+          // Split starting at row start will not read that row.
+          currentKnownTornStart = lastStartOffset;
+          // Row offsets will be determined from the reader (we could set the first from last).
+          lastStartOffset = Long.MIN_VALUE;
+          firstStartOffset = Long.MIN_VALUE;
+          rowsPerSlice = 0;
+          orcWriter.writeIntermediateFooter();
+        }
+      }
+      if (rowsPerSlice > 0) {
+        long fileOffset = offsetReader.getCurrentFileOffset();
+        if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+          // The reader doesn't support offsets.
+          // TODO## Dbl check if these shennanigans are correct wrt cache matching.
+          //       We want to match the exact same splits, and not match anything else ever.
+          //       Perhaps we should just add a flag that would allow us to match exactly.
+          // If cached split was starting at row start, that row would be skipped, so +1
+          firstStartOffset = split.getStart() + 1;
+          // Last row starting at the end of the split would be read.
+          lastStartOffset = split.getStart() + split.getLength();
+          // However, it must end after the split end, otherwise the next one would have been read.
+          fileOffset = lastStartOffset + 1;
+          LOG.info("TODO# setting fake cache offsets based on split offsets - 'first row' at "
+              + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+        }
+        cacheWriter.setCurrentStripeOffsets(
+            currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      }
+      // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
+      orcWriter.close();
+      orcWriter = null;
+    } finally {
+      // We don't need the source reader anymore.
+      if (offsetReader != null) {
+        try {
+          offsetReader.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close source reader", ex);
+        }
+      } else {
+        assert sourceReader != null;
+        try {
+          sourceReader.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close source reader", ex);
+        }
+      }
+    }
+  }
+
+  private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
+    SplitLocationInfo[] locInfo = split.getLocationInfo();
+    if (locInfo == null) return new String[0];
+    List<String> hosts = null; // TODO: most of the time, there's no in-memory. Use an array?
+    for (int i = 0; i < locInfo.length; i++) {
+      if (locInfo[i].isInMemory() != isInMemory) continue;
+      if (hosts == null) {
+        hosts = new ArrayList<>();
+      }
+      hosts.add(locInfo[i].getLocation());
+    }
+    if (hosts == null) return new String[0];
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
+  private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
+      boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+    if (ecb == null) { // This basically means stop has been called.
+      cleanupReaders();
+      return false;
+    }
+    LOG.info("TODO# Sending over the ecb");
+    try {
+      consumer.consumeData(ecb);
+    } catch (Throwable ex) {
+      LOG.info("TODO# threw", ex);
+      consumer.setError(ex); // TODO## this is wrong, it shouldn't throw
+    }
+    if (hasCachedSlice) {
+      cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
+    }
+    if (writerData != null) {
+      writerData.colStreams.clear();
+    }
+    return true;
+  }
+
+
+  private void cleanupReaders() {
+    if (orcWriter != null) {
+      try {
+        orcWriter.close();
+        orcWriter = null;
+      } catch (Exception ex) {
+        LOG.error("Failed to close ORC writer", ex);
+      }
+    }
+    if (cacheWriter != null) {
+      try {
+        cacheWriter.discardData();
+        cacheWriter = null;
+      } catch (Exception ex) {
+        LOG.error("Failed to close cache writer", ex);
+      }
+    }
+  }
+
+  private void recordReaderTime(long startTime) {
+    counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+  }
+
+  private boolean processStop() {
+    if (!isStopped) return false;
+    LOG.info("SerDe-based data reader is stopping");
+    cleanupReaders();
+    return true;
+  }
+
+  private static Object determineFileId(FileSystem fs, FileSplit split,
+      boolean allowSynthetic) throws IOException {
+    /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
+      Object fileKey = ((OrcSplit)split).getFileKey();
+      if (fileKey != null) return fileKey; */
+    LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
+    return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
+  }
+
+  // TODO: move to a superclass?
+  @Override
+  public void returnData(OrcEncodedColumnBatch ecb) {
+    for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+      if (!ecb.hasData(colIx)) continue;
+      ColumnStreamData[] datas = ecb.getColumnData(colIx);
+      for (ColumnStreamData data : datas) {
+        if (data == null || data.decRef() != 0) continue;
+        if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+          for (MemoryBuffer buf : data.getCacheBuffers()) {
+            LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf);
+          }
+        }
+        bufferManager.decRefBuffers(data.getCacheBuffers());
+        CSD_POOL.offer(data);
+      }
+    }
+    // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+    ECB_POOL.offer(ecb);
+  }
+
+  public TezCounters getTezCounters() {
+    return counters.getTezCounters();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
new file mode 100644
index 0000000..040f1a7
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public interface ConsumerFileMetadata {
+  int getStripeCount();
+  CompressionKind getCompressionKind();
+  List<Type> getTypes();
+  TypeDescription getSchema();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
new file mode 100644
index 0000000..1e28f5f
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+
+public interface ConsumerStripeMetadata {
+  int getStripeIx();
+  long getRowCount();
+  List<ColumnEncoding> getEncodings();
+  String getWriterTimezone();
+  RowIndexEntry getRowIndexEntry(int colIx, int rgIx); // TODO: remove?
+  RowIndex[] getRowIndexes();
+  boolean supportsRowIndexes();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 70cba05..2c7a234 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -41,7 +41,8 @@ import org.apache.orc.impl.ReaderImpl;
  * of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
  * or instead use protobuf structs everywhere instead of the mix of things like now.
  */
-public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMetadata {
+public final class OrcFileMetadata extends LlapCacheableBuffer
+    implements FileMetadata, ConsumerFileMetadata {
   private final List<StripeInformation> stripes;
   private final List<Integer> versionList;
   private final List<OrcProto.StripeStatistics> stripeStats;
@@ -225,6 +226,11 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
     return fileStats;
   }
 
+  @Override
+  public int getStripeCount() {
+    return stripes.size();
+  }
+
   public TypeDescription getSchema() {
     return OrcUtils.convertTypeFromProtobuf(this.types, 0);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 3f4f43b..73a1721 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.MemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 
 public class OrcMetadataCache {
   private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
@@ -116,7 +117,6 @@ public class OrcMetadataCache {
     return touchOnGet(metadata.get(fileKey));
   }
 
-
   private <T extends LlapCacheableBuffer> T touchOnGet(T result) {
     if (result != null) {
       policy.notifyLock(result);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 6f0b9ff..5ef1678 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -32,10 +32,11 @@ import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.RowIndexEntry;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.OrcIndex;
 
-public class OrcStripeMetadata extends LlapCacheableBuffer {
+public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata {
   private final OrcBatchKey stripeKey;
   private final List<OrcProto.ColumnEncoding> encodings;
   private final List<OrcProto.Stream> streams;
@@ -172,4 +173,14 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
   public void resetRowIndex() {
     rowIndex = null;
   }
+
+  @Override
+  public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+    return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx);
+  }
+
+  @Override
+  public boolean supportsRowIndexes() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index dc83b9c..4f02926 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.orc.OrcProto.Type.Builder;
 import org.apache.orc.impl.ReaderImpl;
 
 import com.google.common.collect.Lists;
@@ -538,4 +539,86 @@ public class OrcUtils {
     }
     return result;
   }
+
+  public static List<TypeDescription> setTypeBuilderFromSchema(
+      OrcProto.Type.Builder type, TypeDescription schema) {
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(OrcProto.Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
+        break;
+      case LIST:
+        type.setKind(OrcProto.Type.Kind.LIST);
+        type.addSubtypes(children.get(0).getId());
+        break;
+      case MAP:
+        type.setKind(OrcProto.Type.Kind.MAP);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      case STRUCT:
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        for(String field: schema.getFieldNames()) {
+          type.addFieldNames(field);
+        }
+        break;
+      case UNION:
+        type.setKind(OrcProto.Type.Kind.UNION);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown category: " +
+          schema.getCategory());
+    }
+    return children;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
index 83742e4..5ba1b9b 100644
--- a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -19,7 +19,6 @@
 package org.apache.orc.impl;
 
 import java.io.IOException;
-import java.util.EnumSet;
 
 import org.apache.orc.OrcProto.BloomFilterIndex;
 import org.apache.orc.OrcProto.Footer;

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 975804b..9433e54 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -232,6 +232,11 @@ public class RecordReaderImpl implements RecordReader {
     public long getNext() {
       return entry.getPositions(index++);
     }
+
+    @Override
+    public String toString() {
+      return "{" + entry.getPositionsList() + "; " + index + "}";
+    }
   }
 
   public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 484209b..4bb51c3 100644
--- a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1223,6 +1223,7 @@ public class TreeReaderFactory {
     }
   }
 
+  private static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TreeReaderFactory.class);
   // This class collects together very similar methods for reading an ORC vector of byte arrays and
   // creating the BytesColumnVector.
   //

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b17fb41..e0fcae7 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -125,10 +125,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   public WriterImpl(FileSystem fs,
                     Path path,
                     OrcFile.WriterOptions opts) throws IOException {
-    this.path = path;
+    this(new PhysicalFsWriter(fs, path, opts.getSchema().getMaximumId() + 1, opts), path, opts);
+  }
+
+  public WriterImpl(PhysicalWriter writer,
+                    Path pathForMem,
+                    OrcFile.WriterOptions opts) throws IOException {
+    this.physWriter = writer;
+    this.path = pathForMem;
     this.conf = opts.getConfiguration();
-    this.callback = opts.getCallback();
     this.schema = opts.getSchema();
+    this.callback = opts.getCallback();
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -153,8 +160,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
     }
     this.bloomFilterFpp = opts.getBloomFilterFpp();
-    int numColumns = schema.getMaximumId() + 1;
-    physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
     treeWriter = createTreeWriter(schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -162,7 +167,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, opts.getStripeSize(), this);
+    if (path != null) {
+      memoryManager.addWriter(path, opts.getStripeSize(), this);
+    }
   }
 
   @Override
@@ -2112,83 +2119,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private static void writeTypes(OrcProto.Footer.Builder builder,
                                  TypeDescription schema) {
     OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    List<TypeDescription> children = schema.getChildren();
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        type.setKind(OrcProto.Type.Kind.BOOLEAN);
-        break;
-      case BYTE:
-        type.setKind(OrcProto.Type.Kind.BYTE);
-        break;
-      case SHORT:
-        type.setKind(OrcProto.Type.Kind.SHORT);
-        break;
-      case INT:
-        type.setKind(OrcProto.Type.Kind.INT);
-        break;
-      case LONG:
-        type.setKind(OrcProto.Type.Kind.LONG);
-        break;
-      case FLOAT:
-        type.setKind(OrcProto.Type.Kind.FLOAT);
-        break;
-      case DOUBLE:
-        type.setKind(OrcProto.Type.Kind.DOUBLE);
-        break;
-      case STRING:
-        type.setKind(OrcProto.Type.Kind.STRING);
-        break;
-      case CHAR:
-        type.setKind(OrcProto.Type.Kind.CHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case VARCHAR:
-        type.setKind(OrcProto.Type.Kind.VARCHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case BINARY:
-        type.setKind(OrcProto.Type.Kind.BINARY);
-        break;
-      case TIMESTAMP:
-        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-        break;
-      case DATE:
-        type.setKind(OrcProto.Type.Kind.DATE);
-        break;
-      case DECIMAL:
-        type.setKind(OrcProto.Type.Kind.DECIMAL);
-        type.setPrecision(schema.getPrecision());
-        type.setScale(schema.getScale());
-        break;
-      case LIST:
-        type.setKind(OrcProto.Type.Kind.LIST);
-        type.addSubtypes(children.get(0).getId());
-        break;
-      case MAP:
-        type.setKind(OrcProto.Type.Kind.MAP);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      case STRUCT:
-        type.setKind(OrcProto.Type.Kind.STRUCT);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        for(String field: schema.getFieldNames()) {
-          type.addFieldNames(field);
-        }
-        break;
-      case UNION:
-        type.setKind(OrcProto.Type.Kind.UNION);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown category: " +
-          schema.getCategory());
-    }
+    List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
     builder.addTypes(type);
     if (children != null) {
       for(TypeDescription child: children) {
@@ -2380,7 +2311,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       callback.preFooterWrite(callbackContext);
     }
     // remove us from the memory manager so that we don't get any callbacks
-    memoryManager.removeWriter(path);
+    if (path != null) {
+      memoryManager.removeWriter(path);
+    }
     // actually close the file
     flushStripe();
     writeMetadata();

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 3d81e43..b9c39de 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.hive.llap;
  */
 public class DebugUtils {
   public static String toString(boolean[] a) {
+    if (a == null) return "null";
     StringBuilder b = new StringBuilder();
     b.append('[');
     for (int i = 0; i < a.length; ++i) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 601ad08..004bb2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -225,7 +225,7 @@ public class FetchOperator implements Serializable {
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return HiveInputFormat.wrapForLlap(format, conf);
+    return format;
   }
 
   private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 37e4b9b..46270bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -327,7 +327,7 @@ public final class Utilities {
     return null;
   }
 
-  public static BaseWork getMergeWork(JobConf jconf) {
+  public static BaseWork getMergeWork(Configuration jconf) {
     if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
         || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
       return null;
@@ -335,7 +335,7 @@ public final class Utilities {
     return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
   }
 
-  public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+  public static BaseWork getMergeWork(Configuration jconf, String prefix) {
     if (prefix == null || prefix.isEmpty()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index ba25573..de36f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -19,11 +19,20 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileSplit;
@@ -42,6 +51,9 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit;
  */
 public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
     extends HiveContextAwareRecordReader<K, V> {
+  private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class);
+
+  private LinkedHashMap<Path, PartitionDesc> pathToPartInfo;
 
   public CombineHiveRecordReader(InputSplit split, Configuration conf,
       Reporter reporter, Integer partition, RecordReader preReader) throws IOException {
@@ -57,8 +69,27 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
       throw new IOException("CombineHiveRecordReader: class not found "
           + inputFormatClassName);
     }
-    InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
-        inputFormatClass, jobConf);
+    InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+    try {
+      // TODO: refactor this out
+      if (pathToPartInfo == null) {
+        MapWork mrwork;
+        if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+          mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+          if (mrwork == null) {
+            mrwork = Utilities.getMapWork(jobConf);
+          }
+        } else {
+          mrwork = Utilities.getMapWork(jobConf);
+        }
+        pathToPartInfo = mrwork.getPathToPartitionInfo();
+      }
+
+      PartitionDesc part = extractSinglePartSpec(hsplit);
+      inputFormat = HiveInputFormat.wrapForLlap(inputFormat, jobConf, part);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
 
     // create a split for the given partition
     FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
@@ -79,6 +110,26 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
 
   }
 
+  private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws IOException {
+    PartitionDesc part = null;
+    Map<Map<Path,PartitionDesc>, Map<Path,PartitionDesc>> cache = new HashMap<>();
+    for (Path path : hsplit.getPaths()) {
+      PartitionDesc otherPart = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+          pathToPartInfo, path, cache);
+      LOG.debug("Found spec for " + path + " " + otherPart + " from " + pathToPartInfo);
+      if (part == null) {
+        part = otherPart;
+      } else if (otherPart != part) { // Assume we should have the exact same object.
+        // TODO: we could also compare the schema and SerDe, and pass only those to the call
+        //       instead; most of the time these would be the same and LLAP IO can handle that.
+        LOG.warn("Multiple partitions found; not going to pass a part spec to LLAP IO: {"
+            + part.getPartSpec() + "} and {" + otherPart.getPartSpec() + "}");
+        return null;
+      }
+    }
+    return part;
+  }
+
   @Override
   public void doClose() throws IOException {
     recordReader.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 94fcd60..f8391e0 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -202,7 +204,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static InputFormat<WritableComparable, Writable> wrapForLlap(
-      InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+      InputFormat<WritableComparable, Writable> inputFormat, Configuration conf,
+      PartitionDesc part) throws HiveException {
     if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
       return inputFormat; // LLAP not enabled, no-op.
     }
@@ -227,7 +230,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       LOG.info("Not using LLAP IO because it is not initialized");
       return inputFormat;
     }
-    return castInputFormat(llapIo.getInputFormat(inputFormat));
+    LlapWrappableInputFormatInterface llapIf = (LlapWrappableInputFormatInterface)inputFormat;
+    Deserializer serde = null;
+    if (llapIf.isSerdeBased()) {
+      if (part == null) {
+        LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF", new Exception());
+        return inputFormat;
+      }
+      try {
+        serde = part.getDeserializer(conf);
+      } catch (Exception e) {
+        throw new HiveException("Error creating SerDe for LLAP IO", e);
+      }
+    }
+    return castInputFormat(llapIo.getInputFormat(inputFormat, serde));
   }
 
 
@@ -248,7 +264,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     return (InputFormat<T, U>)from;
   }
 
-
+  /** NOTE: this no longer wraps the IF for LLAP. Call wrapForLlap manually if needed. */
   public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
     Class inputFormatClass, JobConf job) throws IOException {
     InputFormat<WritableComparable, Writable> instance = inputFormats.get(inputFormatClass);
@@ -266,7 +282,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return wrapForLlap(instance, job);
+    return instance;
   }
 
   public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -287,15 +303,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
 
     boolean nonNative = false;
-    PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath());
+    PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+        pathToPartitionInfo, hsplit.getPath(), null);
+    LOG.debug("Found spec for " + hsplit.getPath() + " " + part + " from " + pathToPartitionInfo);
+
     if ((part != null) && (part.getTableDesc() != null)) {
       Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), job);
       nonNative = part.getTableDesc().isNonNative();
     }
 
-    pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(), nonNative);
+    Path splitPath = hsplit.getPath();
+    pushProjectionsAndFilters(job, inputFormatClass, splitPath, nonNative);
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+    try {
+      inputFormat = HiveInputFormat.wrapForLlap(inputFormat, job, part);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
     RecordReader innerReader = null;
     try {
       innerReader = inputFormat.getRecordReader(inputSplit, job, reporter);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
index 66e1f90..f168f3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
@@ -18,5 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-/** Marker interface for LLAP; serves no other purpose. */
-public interface LlapWrappableInputFormatInterface {}
+/** Marker interface for LLAP IO. */
+public interface LlapWrappableInputFormatInterface {
+  boolean isSerdeBased();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 361901e..2a89c03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -332,6 +332,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                              List<Integer> included) {
 
     boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+    if (included == null) {
+      Arrays.fill(result, true);
+      return result;
+    }
     result[0] = true;
     List<TypeDescription> children = readerSchema.getChildren();
     for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
@@ -2482,4 +2486,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
     return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols);
   }
+
+
+  @Override
+  public boolean isSerdeBased() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 075c3b4..3e4ec2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -93,14 +93,28 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
              OrcFile.WriterOptions opts) throws IOException {
     super(fs, path, opts);
     this.inspector = opts.getInspector();
-    internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.fields = initializeFieldsFromOi(inspector);
+  }
+
+  public WriterImpl(PhysicalWriter writer,
+                    Path pathForMem,
+                    OrcFile.WriterOptions opts) throws IOException {
+    super(writer, pathForMem, opts);
+    this.inspector = opts.getInspector();
+    this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.fields = initializeFieldsFromOi(inspector);
+  }
+
+  private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) {
     if (inspector instanceof StructObjectInspector) {
       List<? extends StructField> fieldList =
           ((StructObjectInspector) inspector).getAllStructFieldRefs();
-      fields = new StructField[fieldList.size()];
+      StructField[] fields = new StructField[fieldList.size()];
       fieldList.toArray(fields);
+      return fields;
     } else {
-      fields = null;
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index 2325140..8857d3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -83,4 +83,8 @@ public class CacheChunk extends DiskRangeList {
   public void reset() {
     init(null, -1, -1);
   }
+
+  public void adjustEnd(long l) {
+    this.end += l;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index d5f5f9d..0dba1a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -1775,7 +1775,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
           " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
           columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
-          data != null, dictionary != null, lengths != null, secondary != null, tz);
+          data, dictionary != null, lengths != null, secondary != null, tz);
     }
     switch (schema.getCategory()) {
       case BINARY: