You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/14 03:09:19 UTC

hive git commit: HIVE-15799 : LLAP: rename VertorDeserializeOrcWriter (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 694372770 -> 170637386


HIVE-15799 : LLAP: rename VertorDeserializeOrcWriter (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17063738
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17063738
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17063738

Branch: refs/heads/master
Commit: 170637386d0cf2a1c1849dc9eb2af594a6275b50
Parents: 6943727
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Feb 13 19:09:13 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Feb 13 19:09:13 2017 -0800

----------------------------------------------------------------------
 .../llap/io/encoded/SerDeEncodedDataReader.java |  14 +-
 .../io/encoded/VectorDeserializeOrcWriter.java  | 458 +++++++++++++++++++
 .../io/encoded/VertorDeserializeOrcWriter.java  | 458 -------------------
 3 files changed, 465 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index f6531e8..221c99e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
-import org.apache.hadoop.hive.llap.io.encoded.VertorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCallback;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
@@ -176,7 +176,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
    * the consumer, at which point the consumer is responsible for it.
    */
   private FileData cachedData;
-  private List<VertorDeserializeOrcWriter> asyncWriters = new ArrayList<>();
+  private List<VectorDeserializeOrcWriter> asyncWriters = new ArrayList<>();
 
   public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
       BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
@@ -1375,15 +1375,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       List<Integer> splitColumnIds = OrcInputFormat.genIncludedColumnsReverse(
           schema, splitIncludes, false);
       // fileread writes to the writer, which writes to orcWriter, which writes to cacheWriter
-      EncodingWriter writer = VertorDeserializeOrcWriter.create(
+      EncodingWriter writer = VectorDeserializeOrcWriter.create(
           sourceInputFormat, sourceSerDe, parts, daemonConf, jobConf, split.getPath(), originalOi,
           splitColumnIds, splitIncludes, allocSize);
       // TODO: move this into ctor? EW would need to create CacheWriter then
       List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
       writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
           writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath());
-      if (writer instanceof VertorDeserializeOrcWriter) {
-        VertorDeserializeOrcWriter asyncWriter = (VertorDeserializeOrcWriter)writer;
+      if (writer instanceof VectorDeserializeOrcWriter) {
+        VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer;
         asyncWriter.startAsync(new AsyncCacheDataCallback());
         this.asyncWriters.add(asyncWriter);
       }
@@ -1403,7 +1403,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private class AsyncCacheDataCallback implements AsyncCallback {
     @Override
-    public void onComplete(VertorDeserializeOrcWriter writer) {
+    public void onComplete(VectorDeserializeOrcWriter writer) {
       CacheWriter cacheWriter = null;
       try {
         cacheWriter = writer.getCacheWriter();
@@ -1596,7 +1596,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private void cleanup(boolean isError) {
     cleanUpCurrentRead();
     if (!isError) return;
-    for (VertorDeserializeOrcWriter asyncWriter : asyncWriters) {
+    for (VectorDeserializeOrcWriter asyncWriter : asyncWriters) {
       try {
         asyncWriter.interrupt();
       } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
new file mode 100644
index 0000000..c9df7d9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */
+class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
+  private final VectorizedRowBatchCtx vrbCtx;
+  private Writer orcWriter;
+  private final LazySimpleDeserializeRead deserializeRead;
+  private final VectorDeserializeRow<?> vectorDeserializeRow;
+  private final StructObjectInspector destinationOi;
+  private final boolean usesSourceIncludes;
+  private final List<Integer> sourceIncludes;
+
+  private final boolean isAsync;
+  private final Thread orcThread;
+  private final ConcurrentLinkedQueue<WriteOperation> queue;
+  private AsyncCallback completion;
+
+  // Stored here only as async operation context.
+  private final boolean[] cacheIncludes;
+
+  private VectorizedRowBatch sourceBatch, destinationBatch;
+  private List<VectorizedRowBatch> currentBatches;
+
+  // TODO: if more writers are added, separate out an EncodingWriterFactory
+  public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe,
+      Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf,
+      Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes,
+      boolean[] cacheIncludes, int allocSize) throws IOException {
+    // Vector SerDe can be disabled both on client and server side.
+    if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
+        || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
+        || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) {
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
+    }
+    Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath);
+    PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+        parts, path, null);
+    if (partDesc == null) {
+      LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
+    }
+    Properties tblProps = partDesc.getTableDesc().getProperties();
+    if ("true".equalsIgnoreCase(tblProps.getProperty(
+        serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) {
+      LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to "
+        + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
+    }
+    for (StructField sf : sourceOi.getAllStructFieldRefs()) {
+      Category c = sf.getFieldObjectInspector().getCategory();
+      if (c != Category.PRIMITIVE) {
+        LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported");
+        return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
+      }
+    }
+    LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path);
+    return new VectorDeserializeOrcWriter(
+        daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize);
+  }
+
+  private VectorDeserializeOrcWriter(Configuration conf, Properties tblProps,
+      StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes,
+      int allocSize) throws IOException {
+    super(sourceOi, allocSize);
+    // See also: the usage of VectorDeserializeType, for binary. For now, we only want text.
+    this.vrbCtx = createVrbCtx(sourceOi);
+    this.sourceIncludes = sourceIncludes;
+    this.cacheIncludes = cacheIncludes;
+    this.sourceBatch = vrbCtx.createVectorizedRowBatch();
+    deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(),
+        /* useExternalBuffer */ true, createSerdeParams(conf, tblProps));
+    vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead);
+    int colCount = vrbCtx.getRowColumnTypeInfos().length;
+    boolean[] includes = null;
+    this.usesSourceIncludes = sourceIncludes.size() < colCount;
+    if (usesSourceIncludes) {
+      // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the
+      // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them.
+      // In any case, we would also need to build a new OI for OrcWriter config.
+      // This is why OrcWriter is created after this writer, by the way.
+      this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
+      includes = new boolean[colCount];
+      int inclBatchIx = 0;
+      List<String> childNames = new ArrayList<>(sourceIncludes.size());
+      List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size());
+      List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs();
+      for (Integer columnId : sourceIncludes) {
+        includes[columnId] = true;
+        assert inclBatchIx <= columnId;
+        // Note that we use the same vectors in both batches. Clever, very clever.
+        destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
+        StructField sourceField = sourceFields.get(columnId);
+        childNames.add(sourceField.getFieldName());
+        childOis.add(sourceField.getFieldObjectInspector());
+      }
+      // This is only used by ORC to derive the structure. Most fields are unused.
+      destinationOi = new LazySimpleStructObjectInspector(
+          childNames, childOis, null, (byte)0, null);
+      destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
+      if (LlapIoImpl.LOG.isDebugEnabled()) {
+        LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes));
+      }
+      try {
+        vectorDeserializeRow.init(includes);
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
+    } else {
+      // No includes - use the standard batch.
+      this.destinationBatch = sourceBatch;
+      this.destinationOi = sourceOi;
+      try {
+        vectorDeserializeRow.init();
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
+    }
+    this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED);
+    if (isAsync) {
+      currentBatches = new LinkedList<>();
+      queue = new ConcurrentLinkedQueue<>();
+      orcThread = new Thread(this);
+      orcThread.setDaemon(true);
+      orcThread.setName(Thread.currentThread().getName() + "-OrcEncode");
+    } else {
+      queue = null;
+      orcThread = null;
+      currentBatches = null;
+    }
+  }
+
+  public void startAsync(AsyncCallback callback) {
+    this.completion = callback;
+    this.orcThread.start();
+  }
+
+  private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException {
+    VectorizedRowBatchCtx vrbCtx = new VectorizedRowBatchCtx();
+    try {
+      vrbCtx.init(oi, new String[0]);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
+    return vrbCtx;
+  }
+
+  private static LazySerDeParameters createSerdeParams(
+      Configuration conf, Properties tblProps) throws IOException {
+    try {
+      return new LazySerDeParameters(conf, tblProps, LazySimpleSerDe.class.getName());
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+    this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi);
+    this.cacheWriter = cacheWriter;
+  }
+
+  public interface AsyncCallback {
+    void onComplete(VectorDeserializeOrcWriter writer);
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      WriteOperation op = null;
+      int fallbackMs = 8;
+      while (true) {
+        op = queue.poll();
+        if (op != null) break;
+        if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
+          LlapIoImpl.LOG.error("ORC encoder timed out waiting for input");
+          discardData();
+          return;
+        }
+        try {
+          Thread.sleep(fallbackMs);
+        } catch (InterruptedException e) {
+          LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input");
+          discardData();
+          return;
+        }
+        fallbackMs <<= 1;
+      }
+      try {
+        if (op.apply(orcWriter, cacheWriter)) {
+          LlapIoImpl.LOG.info("ORC encoder received a exit event");
+          completion.onComplete(this);
+          return;
+        }
+      } catch (Exception e) {
+        LlapIoImpl.LOG.error("ORC encoder failed", e);
+        discardData();
+        return;
+      }
+    }
+  }
+
+  private void discardData() {
+    try {
+      cacheWriter.discardData();
+    } catch (Exception ex) {
+      LlapIoImpl.LOG.error("Failed to close an async cache writer", ex);
+    }
+  }
+
+  @Override
+  public void writeOneRow(Writable row) throws IOException {
+    if (sourceBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+      flushBatch();
+    }
+
+    BinaryComparable binComp = (BinaryComparable)row;
+    deserializeRead.set(binComp.getBytes(), 0, binComp.getLength());
+
+    // Deserialize and append new row using the current batch size as the index.
+    try {
+      // Not using ByRef now since it's unsafe for text readers. Might be safe for others.
+      vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++);
+    } catch (Exception e) {
+      throw new IOException("DeserializeRead detail: "
+          + vectorDeserializeRow.getDetailedReadPositionString(), e);
+    }
+  }
+
+  private void flushBatch() throws IOException {
+    addBatchToWriter();
+    if (!isAsync) {
+      for (int c = 0; c < sourceBatch.cols.length; ++c) {
+        // This resets vectors in both batches.
+        ColumnVector colVector = sourceBatch.cols[c];
+        if (colVector != null) {
+          colVector.reset();
+          colVector.init();
+        }
+      }
+      sourceBatch.selectedInUse = false;
+      sourceBatch.size = 0;
+      sourceBatch.endOfFile = false;
+      propagateSourceBatchFieldsToDest();
+    } else {
+      // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline
+      // (neither ever changes the vectors). We'd need a set of vectors batch to write to.
+      // TODO: for now, create this from scratch. Ideally we should return the vectors from ops.
+      //       We could also have the ORC thread create it for us in its spare time...
+      this.sourceBatch = vrbCtx.createVectorizedRowBatch();
+      if (usesSourceIncludes) {
+        this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
+        int inclBatchIx = 0;
+        for (Integer columnId : sourceIncludes) {
+          destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
+        }
+        destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
+      } else {
+        this.destinationBatch = sourceBatch;
+      }
+    }
+  }
+
+  private void propagateSourceBatchFieldsToDest() {
+    if (destinationBatch == sourceBatch) return;
+    destinationBatch.selectedInUse = sourceBatch.selectedInUse;
+    destinationBatch.size = sourceBatch.size;
+    destinationBatch.endOfFile = sourceBatch.endOfFile;
+  }
+
+  private void addBatchToWriter() throws IOException {
+    propagateSourceBatchFieldsToDest();
+    if (!isAsync) {
+      orcWriter.addRowBatch(destinationBatch);
+    } else {
+      currentBatches.add(destinationBatch);
+      addWriteOp(new VrbOperation(destinationBatch));
+    }
+  }
+
+  @Override
+  public void flushIntermediateData() throws IOException {
+    if (sourceBatch.size > 0) {
+      flushBatch();
+    }
+  }
+
+  @Override
+  public void writeIntermediateFooter() throws IOException {
+    if (isAsync) {
+      addWriteOp(new IntermediateFooterOperation());
+    } else {
+      orcWriter.writeIntermediateFooter();
+    }
+  }
+
+  private void addWriteOp(WriteOperation wo) throws AssertionError {
+    if (queue.offer(wo)) return;
+    throw new AssertionError("Queue full"); // This should never happen with linked list queue.
+  }
+
+  @Override
+  public void setCurrentStripeOffsets(long currentKnownTornStart,
+      long firstStartOffset, long lastStartOffset, long fileOffset) {
+    if (isAsync) {
+      addWriteOp(new SetStripeDataOperation(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset));
+    } else {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (sourceBatch.size > 0) {
+      addBatchToWriter();
+    }
+    if (!isAsync) {
+      orcWriter.close();
+    } else {
+      addWriteOp(new CloseOperation());
+    }
+  }
+
+  public List<VectorizedRowBatch> extractCurrentVrbs() {
+    if (!isAsync) return null;
+    List<VectorizedRowBatch> result = currentBatches;
+    currentBatches = new LinkedList<>();
+    return result;
+  }
+
+  private static interface WriteOperation {
+    boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
+  }
+
+  private static class VrbOperation implements WriteOperation {
+    private VectorizedRowBatch batch;
+
+    public VrbOperation(VectorizedRowBatch batch) {
+      // LlapIoImpl.LOG.debug("Adding batch " + batch);
+      this.batch = batch;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      // LlapIoImpl.LOG.debug("Writing batch " + batch);
+      writer.addRowBatch(batch);
+      return false;
+    }
+  }
+
+  private static class IntermediateFooterOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.writeIntermediateFooter();
+      return false;
+    }
+  }
+
+  private static class SetStripeDataOperation implements WriteOperation {
+    private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset;
+    public SetStripeDataOperation(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset) {
+      this.currentKnownTornStart = currentKnownTornStart;
+      this.firstStartOffset = firstStartOffset;
+      this.lastStartOffset = lastStartOffset;
+      this.fileOffset = fileOffset;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      return false;
+    }
+  }
+
+  private static class CloseOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.close();
+      return true; // The thread should stop after this. 
+    }
+  }
+
+  public boolean[] getOriginalCacheIncludes() {
+    return cacheIncludes;
+  }
+
+  @Override
+  public boolean isOnlyWritingIncludedColumns() {
+    return usesSourceIncludes;
+  }
+
+  public void interrupt() {
+    assert orcThread != null;
+    orcThread.interrupt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
deleted file mode 100644
index 86d9ecc..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.encoded;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.orc.Writer;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead;
-import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-/** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */
-class VertorDeserializeOrcWriter extends EncodingWriter implements Runnable {
-  private final VectorizedRowBatchCtx vrbCtx;
-  private Writer orcWriter;
-  private final LazySimpleDeserializeRead deserializeRead;
-  private final VectorDeserializeRow<?> vectorDeserializeRow;
-  private final StructObjectInspector destinationOi;
-  private final boolean usesSourceIncludes;
-  private final List<Integer> sourceIncludes;
-
-  private final boolean isAsync;
-  private final Thread orcThread;
-  private final ConcurrentLinkedQueue<WriteOperation> queue;
-  private AsyncCallback completion;
-
-  // Stored here only as async operation context.
-  private final boolean[] cacheIncludes;
-
-  private VectorizedRowBatch sourceBatch, destinationBatch;
-  private List<VectorizedRowBatch> currentBatches;
-
-  // TODO: if more writers are added, separate out an EncodingWriterFactory
-  public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe,
-      Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf,
-      Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes,
-      boolean[] cacheIncludes, int allocSize) throws IOException {
-    // Vector SerDe can be disabled both on client and server side.
-    if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
-        || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
-        || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) {
-      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
-    }
-    Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath);
-    PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
-        parts, path, null);
-    if (partDesc == null) {
-      LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path);
-      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
-    }
-    Properties tblProps = partDesc.getTableDesc().getProperties();
-    if ("true".equalsIgnoreCase(tblProps.getProperty(
-        serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) {
-      LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to "
-        + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
-      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
-    }
-    for (StructField sf : sourceOi.getAllStructFieldRefs()) {
-      Category c = sf.getFieldObjectInspector().getCategory();
-      if (c != Category.PRIMITIVE) {
-        LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported");
-        return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
-      }
-    }
-    LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path);
-    return new VertorDeserializeOrcWriter(
-        daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize);
-  }
-
-  private VertorDeserializeOrcWriter(Configuration conf, Properties tblProps,
-      StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes,
-      int allocSize) throws IOException {
-    super(sourceOi, allocSize);
-    // See also: the usage of VectorDeserializeType, for binary. For now, we only want text.
-    this.vrbCtx = createVrbCtx(sourceOi);
-    this.sourceIncludes = sourceIncludes;
-    this.cacheIncludes = cacheIncludes;
-    this.sourceBatch = vrbCtx.createVectorizedRowBatch();
-    deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(),
-        /* useExternalBuffer */ true, createSerdeParams(conf, tblProps));
-    vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead);
-    int colCount = vrbCtx.getRowColumnTypeInfos().length;
-    boolean[] includes = null;
-    this.usesSourceIncludes = sourceIncludes.size() < colCount;
-    if (usesSourceIncludes) {
-      // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the
-      // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them.
-      // In any case, we would also need to build a new OI for OrcWriter config.
-      // This is why OrcWriter is created after this writer, by the way.
-      this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
-      includes = new boolean[colCount];
-      int inclBatchIx = 0;
-      List<String> childNames = new ArrayList<>(sourceIncludes.size());
-      List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size());
-      List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs();
-      for (Integer columnId : sourceIncludes) {
-        includes[columnId] = true;
-        assert inclBatchIx <= columnId;
-        // Note that we use the same vectors in both batches. Clever, very clever.
-        destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
-        StructField sourceField = sourceFields.get(columnId);
-        childNames.add(sourceField.getFieldName());
-        childOis.add(sourceField.getFieldObjectInspector());
-      }
-      // This is only used by ORC to derive the structure. Most fields are unused.
-      destinationOi = new LazySimpleStructObjectInspector(
-          childNames, childOis, null, (byte)0, null);
-      destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
-      if (LlapIoImpl.LOG.isDebugEnabled()) {
-        LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes));
-      }
-      try {
-        vectorDeserializeRow.init(includes);
-      } catch (HiveException e) {
-        throw new IOException(e);
-      }
-    } else {
-      // No includes - use the standard batch.
-      this.destinationBatch = sourceBatch;
-      this.destinationOi = sourceOi;
-      try {
-        vectorDeserializeRow.init();
-      } catch (HiveException e) {
-        throw new IOException(e);
-      }
-    }
-    this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED);
-    if (isAsync) {
-      currentBatches = new LinkedList<>();
-      queue = new ConcurrentLinkedQueue<>();
-      orcThread = new Thread(this);
-      orcThread.setDaemon(true);
-      orcThread.setName(Thread.currentThread().getName() + "-OrcEncode");
-    } else {
-      queue = null;
-      orcThread = null;
-      currentBatches = null;
-    }
-  }
-
-  public void startAsync(AsyncCallback callback) {
-    this.completion = callback;
-    this.orcThread.start();
-  }
-
-  private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException {
-    VectorizedRowBatchCtx vrbCtx = new VectorizedRowBatchCtx();
-    try {
-      vrbCtx.init(oi, new String[0]);
-    } catch (HiveException e) {
-      throw new IOException(e);
-    }
-    return vrbCtx;
-  }
-
-  private static LazySerDeParameters createSerdeParams(
-      Configuration conf, Properties tblProps) throws IOException {
-    try {
-      return new LazySerDeParameters(conf, tblProps, LazySimpleSerDe.class.getName());
-    } catch (SerDeException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
-    this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi);
-    this.cacheWriter = cacheWriter;
-  }
-
-  public interface AsyncCallback {
-    void onComplete(VertorDeserializeOrcWriter writer);
-  }
-
-  @Override
-  public void run() {
-    while (true) {
-      WriteOperation op = null;
-      int fallbackMs = 8;
-      while (true) {
-        op = queue.poll();
-        if (op != null) break;
-        if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
-          LlapIoImpl.LOG.error("ORC encoder timed out waiting for input");
-          discardData();
-          return;
-        }
-        try {
-          Thread.sleep(fallbackMs);
-        } catch (InterruptedException e) {
-          LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input");
-          discardData();
-          return;
-        }
-        fallbackMs <<= 1;
-      }
-      try {
-        if (op.apply(orcWriter, cacheWriter)) {
-          LlapIoImpl.LOG.info("ORC encoder received a exit event");
-          completion.onComplete(this);
-          return;
-        }
-      } catch (Exception e) {
-        LlapIoImpl.LOG.error("ORC encoder failed", e);
-        discardData();
-        return;
-      }
-    }
-  }
-
-  private void discardData() {
-    try {
-      cacheWriter.discardData();
-    } catch (Exception ex) {
-      LlapIoImpl.LOG.error("Failed to close an async cache writer", ex);
-    }
-  }
-
-  @Override
-  public void writeOneRow(Writable row) throws IOException {
-    if (sourceBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
-      flushBatch();
-    }
-
-    BinaryComparable binComp = (BinaryComparable)row;
-    deserializeRead.set(binComp.getBytes(), 0, binComp.getLength());
-
-    // Deserialize and append new row using the current batch size as the index.
-    try {
-      // Not using ByRef now since it's unsafe for text readers. Might be safe for others.
-      vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++);
-    } catch (Exception e) {
-      throw new IOException("DeserializeRead detail: "
-          + vectorDeserializeRow.getDetailedReadPositionString(), e);
-    }
-  }
-
-  private void flushBatch() throws IOException {
-    addBatchToWriter();
-    if (!isAsync) {
-      for (int c = 0; c < sourceBatch.cols.length; ++c) {
-        // This resets vectors in both batches.
-        ColumnVector colVector = sourceBatch.cols[c];
-        if (colVector != null) {
-          colVector.reset();
-          colVector.init();
-        }
-      }
-      sourceBatch.selectedInUse = false;
-      sourceBatch.size = 0;
-      sourceBatch.endOfFile = false;
-      propagateSourceBatchFieldsToDest();
-    } else {
-      // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline
-      // (neither ever changes the vectors). We'd need a set of vectors batch to write to.
-      // TODO: for now, create this from scratch. Ideally we should return the vectors from ops.
-      //       We could also have the ORC thread create it for us in its spare time...
-      this.sourceBatch = vrbCtx.createVectorizedRowBatch();
-      if (usesSourceIncludes) {
-        this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
-        int inclBatchIx = 0;
-        for (Integer columnId : sourceIncludes) {
-          destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
-        }
-        destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
-      } else {
-        this.destinationBatch = sourceBatch;
-      }
-    }
-  }
-
-  private void propagateSourceBatchFieldsToDest() {
-    if (destinationBatch == sourceBatch) return;
-    destinationBatch.selectedInUse = sourceBatch.selectedInUse;
-    destinationBatch.size = sourceBatch.size;
-    destinationBatch.endOfFile = sourceBatch.endOfFile;
-  }
-
-  private void addBatchToWriter() throws IOException {
-    propagateSourceBatchFieldsToDest();
-    if (!isAsync) {
-      orcWriter.addRowBatch(destinationBatch);
-    } else {
-      currentBatches.add(destinationBatch);
-      addWriteOp(new VrbOperation(destinationBatch));
-    }
-  }
-
-  @Override
-  public void flushIntermediateData() throws IOException {
-    if (sourceBatch.size > 0) {
-      flushBatch();
-    }
-  }
-
-  @Override
-  public void writeIntermediateFooter() throws IOException {
-    if (isAsync) {
-      addWriteOp(new IntermediateFooterOperation());
-    } else {
-      orcWriter.writeIntermediateFooter();
-    }
-  }
-
-  private void addWriteOp(WriteOperation wo) throws AssertionError {
-    if (queue.offer(wo)) return;
-    throw new AssertionError("Queue full"); // This should never happen with linked list queue.
-  }
-
-  @Override
-  public void setCurrentStripeOffsets(long currentKnownTornStart,
-      long firstStartOffset, long lastStartOffset, long fileOffset) {
-    if (isAsync) {
-      addWriteOp(new SetStripeDataOperation(
-          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset));
-    } else {
-      cacheWriter.setCurrentStripeOffsets(
-          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (sourceBatch.size > 0) {
-      addBatchToWriter();
-    }
-    if (!isAsync) {
-      orcWriter.close();
-    } else {
-      addWriteOp(new CloseOperation());
-    }
-  }
-
-  public List<VectorizedRowBatch> extractCurrentVrbs() {
-    if (!isAsync) return null;
-    List<VectorizedRowBatch> result = currentBatches;
-    currentBatches = new LinkedList<>();
-    return result;
-  }
-
-  private static interface WriteOperation {
-    boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
-  }
-
-  private static class VrbOperation implements WriteOperation {
-    private VectorizedRowBatch batch;
-
-    public VrbOperation(VectorizedRowBatch batch) {
-      // LlapIoImpl.LOG.debug("Adding batch " + batch);
-      this.batch = batch;
-    }
-
-    @Override
-    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
-      // LlapIoImpl.LOG.debug("Writing batch " + batch);
-      writer.addRowBatch(batch);
-      return false;
-    }
-  }
-
-  private static class IntermediateFooterOperation implements WriteOperation {
-    @Override
-    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
-      writer.writeIntermediateFooter();
-      return false;
-    }
-  }
-
-  private static class SetStripeDataOperation implements WriteOperation {
-    private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset;
-    public SetStripeDataOperation(long currentKnownTornStart,
-        long firstStartOffset, long lastStartOffset, long fileOffset) {
-      this.currentKnownTornStart = currentKnownTornStart;
-      this.firstStartOffset = firstStartOffset;
-      this.lastStartOffset = lastStartOffset;
-      this.fileOffset = fileOffset;
-    }
-
-    @Override
-    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
-      cacheWriter.setCurrentStripeOffsets(
-          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
-      return false;
-    }
-  }
-
-  private static class CloseOperation implements WriteOperation {
-    @Override
-    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
-      writer.close();
-      return true; // The thread should stop after this. 
-    }
-  }
-
-  public boolean[] getOriginalCacheIncludes() {
-    return cacheIncludes;
-  }
-
-  @Override
-  public boolean isOnlyWritingIncludedColumns() {
-    return usesSourceIncludes;
-  }
-
-  public void interrupt() {
-    assert orcThread != null;
-    orcThread.interrupt();
-  }
-}