You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:11 UTC

[3/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
new file mode 100644
index 0000000..e84f5cc
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+class LlapRecordReader
+    implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
+
+  private final FileSplit split;
+  private final List<Integer> columnIds;
+  private final SearchArgument sarg;
+  private final String[] columnNames;
+  private final VectorizedRowBatchCtx rbCtx;
+  private final Object[] partitionValues;
+
+  private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+  private ColumnVectorBatch lastCvb = null;
+  private boolean isFirst = true;
+
+  private Throwable pendingError = null;
+  /** Vector that is currently being processed by our user. */
+  private boolean isDone = false;
+  private final boolean isClosed = false;
+  private final ConsumerFeedback<ColumnVectorBatch> feedback;
+  private final QueryFragmentCounters counters;
+  private long firstReturnTime;
+
+  private final JobConf jobConf;
+  private final boolean[] includedColumns;
+  private final ReadPipeline rp;
+  private final ExecutorService executor;
+
+  private TypeDescription fileSchema;
+
+  public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
+      String hostName, ColumnVectorProducer cvp, ExecutorService executor,
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter)
+          throws IOException, HiveException {
+    this.executor = executor;
+    this.jobConf = job;
+    this.split = split;
+    this.columnIds = includedCols;
+    this.sarg = ConvertAstToSearchArg.createFromConf(job);
+    this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
+    final String fragmentId = LlapTezUtils.getFragmentId(job);
+    final String dagId = LlapTezUtils.getDagId(job);
+    final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
+    MDC.put("dagId", dagId);
+    MDC.put("queryId", queryId);
+    TezCounters taskCounters = null;
+    if (fragmentId != null) {
+      MDC.put("fragmentId", fragmentId);
+      taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
+      LOG.info("Received fragment id: {}", fragmentId);
+    } else {
+      LOG.warn("Not using tez counters as fragment id string is null");
+    }
+    this.counters = new QueryFragmentCounters(job, taskCounters);
+    this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
+
+    MapWork mapWork = Utilities.getMapWork(job);
+    VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+    rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
+
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
+    } else {
+      partitionValues = null;
+    }
+
+    boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+    TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
+        job, isAcidScan, Integer.MAX_VALUE);
+
+    // Create the consumer of encoded data; it will coordinate decoding to CVBs.
+    feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames,
+        counters, schema, sourceInputFormat, sourceSerDe, reporter, job);
+    fileSchema = rp.getFileSchema();
+    includedColumns = rp.getIncludedColumns();
+  }
+
+  /**
+   * Starts the data read pipeline
+   */
+  public boolean init() {
+    if (!checkOrcSchemaEvolution()) return false;
+
+    // perform the data read asynchronously
+    if (executor instanceof StatsRecordingThreadPool) {
+      // Every thread created by this thread pool will use the same handler
+      ((StatsRecordingThreadPool) executor).setUncaughtExceptionHandler(
+          new IOUncaughtExceptionHandler());
+    }
+    executor.submit(rp.getReadCallable());
+    return true;
+  }
+
+  private boolean checkOrcSchemaEvolution() {
+    SchemaEvolution schemaEvolution = new SchemaEvolution(
+        fileSchema, rp.getReaderSchema(), includedColumns);
+    for (Integer colId : columnIds) {
+      if (!schemaEvolution.isPPDSafeConversion(colId)) {
+        LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+    assert value != null;
+    if (isClosed) {
+      throw new AssertionError("next called after close");
+    }
+    // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+    boolean wasFirst = isFirst;
+    if (isFirst) {
+      if (partitionValues != null) {
+        rbCtx.addPartitionColsToBatch(value, partitionValues);
+      }
+      isFirst = false;
+    }
+    ColumnVectorBatch cvb = null;
+    try {
+      cvb = nextCvb();
+    } catch (InterruptedException e) {
+      // Query might have been canceled. Stop the background processing.
+      feedback.stop();
+      throw new IOException(e);
+    }
+    if (cvb == null) {
+      if (wasFirst) {
+        firstReturnTime = counters.startTimeCounter();
+      }
+      counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
+      return false;
+    }
+    if (columnIds.size() != cvb.cols.length) {
+      throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
+          + " included, but the reader returned " + cvb.cols.length);
+    }
+    // VRB was created from VrbCtx, so we already have pre-allocated column vectors
+    for (int i = 0; i < cvb.cols.length; ++i) {
+      // Return old CVs (if any) to caller. We assume these things all have the same schema.
+      cvb.swapColumnVector(i, value.cols, columnIds.get(i));
+    }
+    value.selectedInUse = false;
+    value.size = cvb.size;
+    if (wasFirst) {
+      firstReturnTime = counters.startTimeCounter();
+    }
+    return true;
+  }
+
+  public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
+    return rbCtx;
+  }
+
+  private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(final Thread t, final Throwable e) {
+      LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
+          " Message: {}", t.getName(), t.getId(), e.getMessage());
+      setError(e);
+    }
+  }
+
+  ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
+    boolean isFirst = (lastCvb == null);
+    if (!isFirst) {
+      feedback.returnData(lastCvb);
+    }
+    synchronized (pendingData) {
+      // We are waiting for next block. Either we will get it, or be told we are done.
+      boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
+      if (doLogBlocking) {
+        LlapIoImpl.LOG.trace("next will block");
+      }
+      while (isNothingToReport()) {
+        pendingData.wait(100);
+      }
+      if (doLogBlocking) {
+        LlapIoImpl.LOG.trace("next is unblocked");
+      }
+      rethrowErrorIfAny();
+      lastCvb = pendingData.poll();
+    }
+    if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
+      LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
+    }
+    return lastCvb;
+  }
+
+  private boolean isNothingToReport() {
+    return !isDone && pendingData.isEmpty() && pendingError == null;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    return rbCtx.createVectorizedRowBatch();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
+    feedback.stop();
+    rethrowErrorIfAny();
+    MDC.clear();
+  }
+
+  private void rethrowErrorIfAny() throws IOException {
+    if (pendingError == null) return;
+    if (pendingError instanceof IOException) {
+      throw (IOException)pendingError;
+    }
+    throw new IOException(pendingError);
+  }
+
+  @Override
+  public void setDone() {
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("setDone called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    synchronized (pendingData) {
+      isDone = true;
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public void consumeData(ColumnVectorBatch data) {
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("consume called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    synchronized (pendingData) {
+      if (isClosed) {
+        return;
+      }
+      pendingData.add(data);
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public void setError(Throwable t) {
+    counters.incrCounter(LlapIOCounters.NUM_ERRORS);
+    LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
+        isClosed, isDone, pendingError, pendingData.size());
+    assert t != null;
+    synchronized (pendingData) {
+      pendingError = t;
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    // TODO: plumb progress info thru the reader if we can get metadata from loader first.
+    return 0.0f;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index db86296..2e4f2ba 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
 
 /**
@@ -33,7 +37,8 @@ import org.apache.orc.TypeDescription;
  */
 public interface ColumnVectorProducer {
   ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
-                                  List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-                                  QueryFragmentCounters counters,
-                                  TypeDescription readerSchema) throws IOException;
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters, TypeDescription readerSchema,
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter,
+      JobConf job) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 6b54b30..04fed44 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.TypeDescription;
 
 public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
   implements Consumer<BatchType>, ReadPipeline {
@@ -122,4 +123,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     // We are just a relay; send unpause to encoded data producer.
     upstreamFeedback.unpause();
   }
+
+  @Override
+  public TypeDescription getFileSchema() {
+    // TODO: the ORC-specific method should be removed from the interface instead.
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
new file mode 100644
index 0000000..d384b85
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public class GenericColumnVectorProducer implements ColumnVectorProducer {
+  private final SerDeLowLevelCacheImpl cache;
+  private final BufferUsageManager bufferManager;
+  private final Configuration conf;
+  private final LlapDaemonCacheMetrics cacheMetrics;
+  private final LlapDaemonIOMetrics ioMetrics;
+
+  public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache,
+      BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics cacheMetrics,
+      LlapDaemonIOMetrics ioMetrics) {
+    LlapIoImpl.LOG.info("Initializing ORC column vector producer");
+    this.cache = serdeCache;
+    this.bufferManager = bufferManager;
+    this.conf = conf;
+    this.cacheMetrics = cacheMetrics;
+    this.ioMetrics = ioMetrics;
+  }
+
+  @Override
+  public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters, TypeDescription schema, InputFormat<?, ?> sourceInputFormat,
+      Deserializer sourceSerDe, Reporter reporter, JobConf job) throws IOException {
+    cacheMetrics.incrCacheReadRequests();
+    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
+        consumer, columnIds.size(), false, counters, ioMetrics);
+    TextFileMetadata fm;
+    try {
+      fm = new TextFileMetadata(sourceSerDe);
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+    edc.setFileMetadata(fm);
+    // Note that we pass job config to the record reader, but use global config for LLAP IO.
+    SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache,
+        bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat,
+        sourceSerDe, counters, fm.getSchema());
+    edc.init(reader, reader);
+    LlapIoImpl.LOG.info("Ignoring schema: " + schema);
+    return edc;
+  }
+
+
+  public static final class TextStripeMetadata implements ConsumerStripeMetadata {
+    // The writer is local to the process.
+    private final String writerTimezone = TimeZone.getDefault().getID();
+    private List<ColumnEncoding> encodings;
+    private final int stripeIx;
+    private long rowCount = -1;
+
+    public TextStripeMetadata(int stripeIx) {
+      this.stripeIx = stripeIx;
+    }
+
+    @Override
+    public String getWriterTimezone() {
+      return writerTimezone;
+    }
+
+    @Override
+    public int getStripeIx() {
+      return stripeIx;
+    }
+
+    @Override
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    @Override
+    public List<ColumnEncoding> getEncodings() {
+      return encodings;
+    }
+
+    @Override
+    public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+      throw new UnsupportedOperationException();
+    }
+
+    public void setEncodings(List<ColumnEncoding> encodings) {
+      this.encodings = encodings;
+    }
+
+    @Override
+    public RowIndex[] getRowIndexes() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean supportsRowIndexes() {
+      return false;
+    }
+
+    public void setRowCount(long value) {
+      rowCount = value;
+    }
+
+    @Override
+    public String toString() {
+      return "[stripeIx=" + stripeIx + ", rowCount=" + rowCount + ", encodings=" + encodings + "]".replace('\n', ' ');
+    }
+  }
+
+
+  private static final class TextFileMetadata implements ConsumerFileMetadata {
+    private final List<Type> orcTypes = new ArrayList<>();
+    private final TypeDescription schema;
+    public TextFileMetadata(Deserializer sourceSerDe) throws SerDeException {
+      TypeDescription schema = OrcInputFormat.convertTypeInfo(
+          TypeInfoUtils.getTypeInfoFromObjectInspector(sourceSerDe.getObjectInspector()));
+      this.schema = schema;
+      addTypesFromSchema(schema);
+    }
+
+    private void addTypesFromSchema(TypeDescription schema) {
+      // The same thing that WriterImpl does when writing the footer, but w/o the footer.
+      OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+      List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
+      orcTypes.add(type.build());
+      if (children == null) return;
+      for(TypeDescription child : children) {
+        addTypesFromSchema(child);
+      }
+    }
+
+    @Override
+    public List<Type> getTypes() {
+      return orcTypes;
+    }
+
+    @Override
+    public int getStripeCount() {
+      return 1;
+    }
+
+    @Override
+    public CompressionKind getCompressionKind() {
+      return CompressionKind.NONE;
+    }
+
+    @Override
+    public TypeDescription getSchema() {
+      return schema;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
new file mode 100644
index 0000000..7fecdaa
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
+
+// TODO# VectorizedInputFormatInterface is a hack... only "vectorized" in LLAP IO.
+//       How to resolve optimizer dependency?
+public class LlapTextInputFormat extends org.apache.hadoop.mapred.TextInputFormat
+  implements LlapWrappableInputFormatInterface, VectorizedInputFormatInterface {
+
+  @Override
+  public boolean isSerdeBased() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 2e9b9c3..565e3d2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -34,7 +34,11 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
 
 public class OrcColumnVectorProducer implements ColumnVectorProducer {
@@ -63,12 +67,14 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
 
   @Override
   public ReadPipeline createReadPipeline(
-      Consumer<ColumnVectorBatch> consumer, FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+      Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds,
+      SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters,
+      TypeDescription readerSchema, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
+      Reporter reporter, JobConf job) throws IOException {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, ioMetrics);
+    // Note: we use global conf here and ignore JobConf.
     OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
         metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
     edc.init(reader, reader);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 29f1ba8..eb40d1f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -16,16 +16,16 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory.SettableTreeReader;
@@ -53,11 +52,12 @@ import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
 import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
 import org.apache.orc.OrcProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 
 public class OrcEncodedDataConsumer
   extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> {
@@ -65,9 +65,9 @@ public class OrcEncodedDataConsumer
   private TreeReaderFactory.TreeReader[] columnReaders;
   private int[] columnMapping; // Mapping from columnReaders (by index) to columns in file schema.
   private int previousStripeIndex = -1;
-  private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
+  private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file.
   private CompressionCodec codec;
-  private OrcStripeMetadata[] stripes;
+  private List<ConsumerStripeMetadata> stripes;
   private final boolean skipCorrupt; // TODO: get rid of this
   private final QueryFragmentCounters counters;
   private boolean[] includedColumns;
@@ -82,16 +82,21 @@ public class OrcEncodedDataConsumer
     this.counters = counters;
   }
 
-  public void setFileMetadata(OrcFileMetadata f) {
+  public void setFileMetadata(ConsumerFileMetadata f) {
     assert fileMetadata == null;
     fileMetadata = f;
-    stripes = new OrcStripeMetadata[f.getStripes().size()];
-    codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind());
+    stripes = new ArrayList<>(f.getStripeCount());
+    codec = PhysicalFsWriter.createCodec(f.getCompressionKind());
   }
 
-  public void setStripeMetadata(OrcStripeMetadata m) {
+  public void setStripeMetadata(ConsumerStripeMetadata m) {
     assert stripes != null;
-    stripes[m.getStripeIx()] = m;
+    int newIx = m.getStripeIx();
+    for (int i = stripes.size(); i <= newIx; ++i) {
+      stripes.add(null);
+    }
+    assert stripes.get(newIx) == null;
+    stripes.set(newIx, m);
   }
 
   @Override
@@ -103,17 +108,18 @@ public class OrcEncodedDataConsumer
     boolean sameStripe = currentStripeIndex == previousStripeIndex;
 
     try {
-      OrcStripeMetadata stripeMetadata = stripes[currentStripeIndex];
+      ConsumerStripeMetadata stripeMetadata = stripes.get(currentStripeIndex);
       // Get non null row count from root column, to get max vector batches
       int rgIdx = batch.getBatchKey().rgIx;
       long nonNullRowCount = -1;
       if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) {
         nonNullRowCount = stripeMetadata.getRowCount();
       } else {
-        OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
+        OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexEntry(0, rgIdx);
         nonNullRowCount = getRowCount(rowIndex);
       }
       int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
+      // LOG.info("TODO# expecting " + nonNullRowCount + " rows with at most " + maxBatchesRG);
       int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
       TypeDescription schema = fileMetadata.getSchema();
 
@@ -136,6 +142,7 @@ public class OrcEncodedDataConsumer
           batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
           if (batchSize == 0) break;
         }
+        // LOG.info("TODO# batch " + i + " of " + batchSize);
 
         ColumnVectorBatch cvb = cvbPool.take();
         // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
@@ -148,6 +155,7 @@ public class OrcEncodedDataConsumer
             cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), batchSize);
           }
           cvb.cols[idx].ensureSize(batchSize, false);
+          LOG.info("TODO# nextVector on " + idx + "; "+ reader + " for " + columnMapping[idx]);
           reader.nextVector(cvb.cols[idx], null, batchSize);
         }
 
@@ -155,6 +163,7 @@ public class OrcEncodedDataConsumer
         downstreamConsumer.consumeData(cvb);
         counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
       }
+      LOG.info("TODO# done with decode");
       counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
       counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
       counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
@@ -214,8 +223,9 @@ public class OrcEncodedDataConsumer
   }
 
   private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
+      EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
     PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    // LOG.info("TODO# positionInStreams pps " + Lists.newArrayList(pps));
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       columnReaders[i].seek(pps);
@@ -224,8 +234,9 @@ public class OrcEncodedDataConsumer
 
   private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
       EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
-      OrcStripeMetadata stripeMetadata) throws IOException {
+      ConsumerStripeMetadata stripeMetadata) throws IOException {
     PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    // LOG.info("TODO# repositionInStreams pps " + Lists.newArrayList(pps));
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       TreeReader reader = columnReaders[i];
@@ -240,20 +251,46 @@ public class OrcEncodedDataConsumer
     }
   }
 
-  private PositionProvider[] createPositionProviders(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
-    if (columnReaders.length == 0) return null;
-    int rowGroupIndex = batch.getBatchKey().rgIx;
-    if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
-      throw new IOException("Cannot position readers without RG information");
+  /**
+   * Position provider used in absence of indexes, e.g. for serde-based reader, where each stream
+   * is in its own physical 'container', always starting at 0, and there are no RGs.
+   */
+  private final static class IndexlessPositionProvider implements PositionProvider {
+    @Override
+    public long getNext() {
+      return 0;
     }
-    // TODO: this assumes indexes in getRowIndexes would match column IDs
-    OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
-    PositionProvider[] pps = new PositionProvider[ris.length];
-    for (int i = 0; i < ris.length; ++i) {
-      OrcProto.RowIndex ri = ris[i];
-      if (ri == null) continue;
-      pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+
+    @Override
+    public String toString() {
+      return "indexes not supported";
+    }
+  }
+
+  private PositionProvider[] createPositionProviders(
+      TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+      ConsumerStripeMetadata stripeMetadata) throws IOException {
+    if (columnReaders.length == 0) return null;
+    PositionProvider[] pps = null;
+    if (!stripeMetadata.supportsRowIndexes()) {
+      PositionProvider singleRgPp = new IndexlessPositionProvider();
+      pps = new PositionProvider[stripeMetadata.getEncodings().size()];
+      for (int i = 0; i < pps.length; ++i) {
+        pps[i] = singleRgPp;
+      }
+    } else {
+      int rowGroupIndex = batch.getBatchKey().rgIx;
+      if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
+        throw new IOException("Cannot position readers without RG information");
+      }
+      // TODO: this assumes indexes in getRowIndexes would match column IDs
+      OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
+      pps = new PositionProvider[ris.length];
+      for (int i = 0; i < ris.length; ++i) {
+        OrcProto.RowIndex ri = ris[i];
+        if (ri == null) continue;
+        pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+      }
     }
     return pps;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
index 4e1b851..36f6c9c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
@@ -25,7 +25,7 @@ import org.apache.orc.TypeDescription;
 
 public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> {
   public Callable<Void> getReadCallable();
-  TypeDescription getFileSchema();
+  TypeDescription getFileSchema(); // TODO: this is ORC-specific and should be removed
   TypeDescription getReaderSchema();
   boolean[] getIncludedColumns();
 }
\ No newline at end of file