You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2018/08/06 23:52:33 UTC

hive git commit: HIVE-20300: VectorFileSinkArrowOperator (Eric Wohlstadter, reviewed by Jason Dere, Matt McCline, Teddy Choi)

Repository: hive
Updated Branches:
  refs/heads/master 42bf02e33 -> a8ef2147f


HIVE-20300: VectorFileSinkArrowOperator (Eric Wohlstadter, reviewed by Jason Dere, Matt McCline, Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8ef2147
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8ef2147
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8ef2147

Branch: refs/heads/master
Commit: a8ef2147fad5aeaaf01279230da9c584db6a2337
Parents: 42bf02e
Author: Matt McCline <mm...@hortonworks.com>
Authored: Mon Aug 6 18:52:24 2018 -0500
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Mon Aug 6 18:52:24 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +-
 .../apache/hive/jdbc/BaseJdbcWithMiniLlap.java  |  38 +-
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java    |   6 +-
 .../hive/jdbc/TestJdbcWithMiniLlapRow.java      |   6 +-
 .../jdbc/TestJdbcWithMiniLlapVectorArrow.java   | 235 ++++++
 .../filesink/VectorFileSinkArrowOperator.java   | 180 +++++
 .../hadoop/hive/ql/io/arrow/Serializer.java     | 754 +++++++++++++------
 .../hive/ql/optimizer/physical/Vectorizer.java  |  60 +-
 8 files changed, 1032 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 535a56b..e251920 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3662,7 +3662,11 @@ public class HiveConf extends Configuration {
         "internal use only. When false, don't suppress fatal exceptions like\n" +
         "NullPointerException, etc so the query will fail and assure it will be noticed",
         true),
-
+    HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED(
+        "hive.vectorized.execution.filesink.arrow.native.enabled", false,
+        "This flag should be set to true to enable the native vectorization\n" +
+        "of queries using the Arrow SerDe and FileSink.\n" +
+        "The default value is false."),
     HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control "
         + "whether to check, convert, and normalize partition value to conform to its column type in "
         + "partition operations including but not limited to insert, such as alter, describe etc."),

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 280119b..98f4729 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -110,29 +110,10 @@ public abstract class BaseJdbcWithMiniLlap {
   private static Connection hs2Conn = null;
 
   // This method should be called by sub-classes in a @BeforeClass initializer
-  public static void beforeTest(boolean useArrow) throws Exception {
+  public static void beforeTest(HiveConf inputConf) throws Exception {
+    conf = inputConf;
     Class.forName(MiniHS2.getJdbcDriverName());
-
-    String confDir = "../../data/conf/llap/";
-    if (confDir != null && !confDir.isEmpty()) {
-      HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
-      System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
-    }
-
-    conf = new HiveConf();
-    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    if(useArrow) {
-      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
-    } else {
-      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
-    }
-
-    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-        + "/tez-site.xml"));
-
     miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
     dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
@@ -141,6 +122,19 @@ public abstract class BaseJdbcWithMiniLlap {
     miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
   }
 
+  static HiveConf defaultConf() throws Exception {
+    String confDir = "../../data/conf/llap/";
+    if (confDir != null && !confDir.isEmpty()) {
+      HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
+      System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+    }
+    HiveConf defaultConf = new HiveConf();
+    defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml"));
+    return defaultConf;
+  }
+
   @Before
   public void setUp() throws Exception {
     hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
@@ -546,6 +540,8 @@ public abstract class BaseJdbcWithMiniLlap {
         rowProcessor.process(row);
         ++rowCount;
       }
+       //In arrow-mode this will throw exception unless all buffers have been released
+       //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
       reader.close();
     }
     LlapBaseInputFormat.close(handleId);

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index e69c686..c02980b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.Row;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.BeforeClass;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
@@ -40,7 +42,9 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    BaseJdbcWithMiniLlap.beforeTest(true);
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
index 809068f..d954d0e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -25,6 +25,8 @@ import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.After;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 /**
  * TestJdbcWithMiniLlap for llap Row format.
@@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    BaseJdbcWithMiniLlap.beforeTest(false);
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
new file mode 100644
index 0000000..55a2df8
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import java.math.BigDecimal;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import java.util.List;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for Arrow format with vectorized output sink
+ */
+public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
+
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
+  }
+
+  @Override
+  protected InputFormat<NullWritable, Row> getInputFormat() {
+    //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
+    return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+  }
+
+  // Currently MAP type is not supported. Add it back when Arrow 1.0 is released.
+  // See: SPARK-21187
+  @Override
+  public void testDataTypes() throws Exception {
+    createDataTypesTable("datatypes");
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from datatypes";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+
+    // Verify schema
+    String[][] colNameTypes = new String[][] {
+        {"datatypes.c1", "int"},
+        {"datatypes.c2", "boolean"},
+        {"datatypes.c3", "double"},
+        {"datatypes.c4", "string"},
+        {"datatypes.c5", "array<int>"},
+        {"datatypes.c6", "map<int,string>"},
+        {"datatypes.c7", "map<string,string>"},
+        {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+        {"datatypes.c9", "tinyint"},
+        {"datatypes.c10", "smallint"},
+        {"datatypes.c11", "float"},
+        {"datatypes.c12", "bigint"},
+        {"datatypes.c13", "array<array<string>>"},
+        {"datatypes.c14", "map<int,map<int,int>>"},
+        {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+        {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+        {"datatypes.c17", "timestamp"},
+        {"datatypes.c18", "decimal(16,7)"},
+        {"datatypes.c19", "binary"},
+        {"datatypes.c20", "date"},
+        {"datatypes.c21", "varchar(20)"},
+        {"datatypes.c22", "char(15)"},
+        {"datatypes.c23", "binary"},
+    };
+    FieldDesc fieldDesc;
+    assertEquals(23, rowCollector.numColumns);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      fieldDesc = rowCollector.schema.getColumns().get(idx);
+      assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+      assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+    }
+
+    // First row is all nulls
+    Object[] rowValues = rowCollector.rows.get(0);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      assertEquals("idx=" + idx, null, rowValues[idx]);
+    }
+
+    // Second Row
+    rowValues = rowCollector.rows.get(1);
+    assertEquals(Integer.valueOf(-1), rowValues[0]);
+    assertEquals(Boolean.FALSE, rowValues[1]);
+    assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+    assertEquals("", rowValues[3]);
+
+    List<?> c5Value = (List<?>) rowValues[4];
+    assertEquals(0, c5Value.size());
+
+    //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+    //assertEquals(0, c6Value.size());
+
+    //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+    //assertEquals(0, c7Value.size());
+
+    List<?> c8Value = (List<?>) rowValues[7];
+    assertEquals(null, c8Value.get(0));
+    assertEquals(null, c8Value.get(1));
+    assertEquals(null, c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+    assertEquals(Short.valueOf((short) -1), rowValues[9]);
+    assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+    List<?> c13Value = (List<?>) rowValues[12];
+    assertEquals(0, c13Value.size());
+
+    //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+    //assertEquals(0, c14Value.size());
+
+    List<?> c15Value = (List<?>) rowValues[14];
+    assertEquals(null, c15Value.get(0));
+    assertEquals(null, c15Value.get(1));
+
+    //List<?> c16Value = (List<?>) rowValues[15];
+    //assertEquals(0, c16Value.size());
+
+    assertEquals(null, rowValues[16]);
+    assertEquals(null, rowValues[17]);
+    assertEquals(null, rowValues[18]);
+    assertEquals(null, rowValues[19]);
+    assertEquals(null, rowValues[20]);
+    assertEquals(null, rowValues[21]);
+    assertEquals(null, rowValues[22]);
+
+    // Third row
+    rowValues = rowCollector.rows.get(2);
+    assertEquals(Integer.valueOf(1), rowValues[0]);
+    assertEquals(Boolean.TRUE, rowValues[1]);
+    assertEquals(Double.valueOf(1.1d), rowValues[2]);
+    assertEquals("1", rowValues[3]);
+
+    c5Value = (List<?>) rowValues[4];
+    assertEquals(2, c5Value.size());
+    assertEquals(Integer.valueOf(1), c5Value.get(0));
+    assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+    //c6Value = (Map<?,?>) rowValues[5];
+    //assertEquals(2, c6Value.size());
+    //assertEquals("x", c6Value.get(Integer.valueOf(1)));
+    //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+    //c7Value = (Map<?,?>) rowValues[6];
+    //assertEquals(1, c7Value.size());
+    //assertEquals("v", c7Value.get("k"));
+
+    c8Value = (List<?>) rowValues[7];
+    assertEquals("a", c8Value.get(0));
+    assertEquals(Integer.valueOf(9), c8Value.get(1));
+    assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+    assertEquals(Short.valueOf((short) 1), rowValues[9]);
+    assertEquals(Float.valueOf(1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(1l), rowValues[11]);
+
+    c13Value = (List<?>) rowValues[12];
+    assertEquals(2, c13Value.size());
+    List<?> listVal = (List<?>) c13Value.get(0);
+    assertEquals("a", listVal.get(0));
+    assertEquals("b", listVal.get(1));
+    listVal = (List<?>) c13Value.get(1);
+    assertEquals("c", listVal.get(0));
+    assertEquals("d", listVal.get(1));
+
+    //c14Value = (Map<?,?>) rowValues[13];
+    //assertEquals(2, c14Value.size());
+    //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+    //assertEquals(2, mapVal.size());
+    //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+    //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+    //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+    //assertEquals(1, mapVal.size());
+    //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+    c15Value = (List<?>) rowValues[14];
+    assertEquals(Integer.valueOf(1), c15Value.get(0));
+    listVal = (List<?>) c15Value.get(1);
+    assertEquals(2, listVal.size());
+    assertEquals(Integer.valueOf(2), listVal.get(0));
+    assertEquals("x", listVal.get(1));
+
+    //c16Value = (List<?>) rowValues[15];
+    //assertEquals(2, c16Value.size());
+    //listVal = (List<?>) c16Value.get(0);
+    //assertEquals(2, listVal.size());
+    //mapVal = (Map<?,?>) listVal.get(0);
+    //assertEquals(0, mapVal.size());
+    //assertEquals(Integer.valueOf(1), listVal.get(1));
+    //listVal = (List<?>) c16Value.get(1);
+    //mapVal = (Map<?,?>) listVal.get(0);
+    //assertEquals(2, mapVal.size());
+    //assertEquals("b", mapVal.get("a"));
+    //assertEquals("d", mapVal.get("c"));
+    //assertEquals(Integer.valueOf(2), listVal.get(1));
+
+    assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"), rowValues[16]);
+    assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+    assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+    assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+    assertEquals("abc123", rowValues[20]);
+    assertEquals("abc123         ", rowValues[21]);
+    assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
new file mode 100644
index 0000000..1603703
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.filesink;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.hive.ql.io.arrow.Serializer;
+import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.core.layout.AbstractStringLayout;
+
+/**
+ * Native Vectorized File Sink operator implementation for Arrow.
+ * Assumes output to LlapOutputFormatService
+ **/
+public class VectorFileSinkArrowOperator extends TerminalOperator<FileSinkDesc>
+    implements Serializable, VectorizationOperator {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorizationContext vContext;
+  private VectorFileSinkDesc vectorDesc;
+  public static final Logger LOG = LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName());
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+  
+  private transient Serializer converter;
+  private transient RecordWriter recordWriter;
+  private transient boolean wroteData;
+  private transient String attemptId;
+
+  public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) {
+    this(ctx);
+    this.conf = (FileSinkDesc) conf;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorFileSinkDesc) vectorDesc;
+  }
+
+  /** Kryo ctor. */
+  @VisibleForTesting
+  public VectorFileSinkArrowOperator() {
+    super();
+  }
+
+  public VectorFileSinkArrowOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    //attemptId identifies a RecordWriter initialized by LlapOutputFormatService
+    this.attemptId = hconf.get(LLAP_OF_ID_KEY);
+    try {
+      //Initialize column names and types
+      List<TypeInfo> typeInfos = new ArrayList<>();
+      List<String> fieldNames = new ArrayList<>();
+      StructObjectInspector schema = (StructObjectInspector) inputObjInspectors[0];
+      for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) {
+        StructField structField = schema.getAllStructFieldRefs().get(i);
+        fieldNames.add(structField.getFieldName());
+        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector());
+        typeInfos.add(typeInfo);
+      }
+      //Initialize an Arrow serializer
+      converter = new Serializer(hconf, attemptId, typeInfos, fieldNames);
+    } catch (Exception e) {
+      LOG.error("Unable to initialize VectorFileSinkArrowOperator");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(Object data, int tag) throws HiveException {
+    //ArrowStreamReader expects at least the schema metadata, if this op writes no data,
+    //we need to send the schema to close the stream gracefully
+    VectorizedRowBatch batch = (VectorizedRowBatch) data;
+    try {
+      if(recordWriter == null) {
+        recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+      }
+      //Convert the VectorizedRowBatch to a handle for the Arrow batch
+      ArrowWrapperWritable writable = converter.serializeBatch(batch, true);
+      //Pass the handle to the LlapOutputFormatService recordWriter
+      recordWriter.write(null, writable);
+      this.wroteData = true;
+    } catch(Exception e) {
+      LOG.error("Failed to convert VectorizedRowBatch to Arrow batch");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    try {
+      if(!wroteData) {
+        //Send a schema only batch to signal EOS with no data written
+        ArrowWrapperWritable writable = converter.emptyBatch();
+        if(recordWriter == null) {
+          recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+        }
+        recordWriter.write(null, writable);
+      }
+    } catch(Exception e) {
+      LOG.error("Failed to write Arrow stream schema");
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        //Close the recordWriter with null Reporter
+        recordWriter.close(null);
+      } catch(Exception e) {
+        LOG.error("Failed to close Arrow stream");
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.FILESINK;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 65a889e..08e0fb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -42,10 +42,12 @@ import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -87,21 +90,39 @@ import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStruc
 import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
 import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
 
-class Serializer {
+public class Serializer {
   private final int MAX_BUFFERED_ROWS;
 
-  // Schema
-  private final StructTypeInfo structTypeInfo;
-  private final int fieldSize;
-
   // Hive columns
   private final VectorizedRowBatch vectorizedRowBatch;
   private final VectorAssignRow vectorAssignRow;
   private int batchSize;
   private BufferAllocator allocator;
+  private List<TypeInfo> fieldTypeInfos;
+  private List<String> fieldNames;
+  private int fieldSize;
 
   private final NullableMapVector rootVector;
 
+  //Constructor for non-serde serialization
+  public Serializer(Configuration conf, String attemptId, List<TypeInfo> typeInfos, List<String> fieldNames) {
+    this.fieldTypeInfos = typeInfos;
+    this.fieldNames = fieldNames;
+    long childAllocatorLimit = HiveConf.getLongVar(conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
+    //Use per-task allocator for accounting only, no need to reserve per-task memory
+    long childAllocatorReservation = 0L;
+    //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed
+    allocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator(
+        attemptId,
+        childAllocatorReservation,
+        childAllocatorLimit);
+    rootVector = NullableMapVector.empty(null, allocator);
+    //These last fields are unused in non-serde usage
+    vectorizedRowBatch = null;
+    vectorAssignRow = null;
+    MAX_BUFFERED_ROWS = 0;
+  }
+
   Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
     MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
     long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
@@ -111,13 +132,14 @@ class Serializer {
     long childAllocatorReservation = 0L;
     //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed
     allocator = serDe.rootAllocator.newChildAllocator(
-       childAllocatorName,
-       childAllocatorReservation,
-       childAllocatorLimit);
+        childAllocatorName,
+        childAllocatorReservation,
+        childAllocatorLimit);
 
     // Schema
-    structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
-    List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    StructTypeInfo structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+    fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    fieldNames = structTypeInfo.getAllStructFieldNames();
     fieldSize = fieldTypeInfos.size();
     // Init Arrow stuffs
     rootVector = NullableMapVector.empty(null, allocator);
@@ -138,33 +160,66 @@ class Serializer {
     }
   }
 
-  private ArrowWrapperWritable serializeBatch() {
+  //Construct an emptyBatch which contains schema-only info
+  public ArrowWrapperWritable emptyBatch() {
+    rootVector.setValueCount(0);
+    for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++) {
+      final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+      final String fieldName = fieldNames.get(fieldIndex);
+      final FieldType fieldType = toFieldType(fieldTypeInfo);
+      final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class);
+      arrowVector.setInitialCapacity(0);
+      arrowVector.allocateNew();
+    }
+    VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+    return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
+  }
+
+  //Used for both:
+  //1. VectorizedRowBatch constructed by batching rows
+  //2. VectorizedRowBatch provided from upstream (isNative)
+  public ArrowWrapperWritable serializeBatch(VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     rootVector.setValueCount(0);
 
     for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; fieldIndex++) {
       final int projectedColumn = vectorizedRowBatch.projectedColumns[fieldIndex];
       final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
-      final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
-      final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+      final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+      final String fieldName = fieldNames.get(fieldIndex);
       final FieldType fieldType = toFieldType(fieldTypeInfo);
+      //Reuse existing FieldVector buffers
+      //since we always call setValue or setNull for each row
+      boolean fieldExists = false;
+      if(rootVector.getChild(fieldName) != null) {
+        fieldExists = true;
+      }
       final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class);
-      arrowVector.setInitialCapacity(batchSize);
-      arrowVector.allocateNew();
-      write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+      if(fieldExists) {
+        arrowVector.setValueCount(isNative ? vectorizedRowBatch.size : batchSize);
+      } else {
+        arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size : batchSize);
+        arrowVector.allocateNew();
+      }
+      write(arrowVector, hiveVector, fieldTypeInfo, isNative ? vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative);
+    }
+    if(!isNative) {
+      //Only mutate batches that are constructed by this serde
+      vectorizedRowBatch.reset();
+      rootVector.setValueCount(batchSize);
+    } else {
+      rootVector.setValueCount(vectorizedRowBatch.size);
     }
-    vectorizedRowBatch.reset();
-    rootVector.setValueCount(batchSize);
 
     batchSize = 0;
     VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
     return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
   }
 
-  private FieldType toFieldType(TypeInfo typeInfo) {
+  private static FieldType toFieldType(TypeInfo typeInfo) {
     return new FieldType(true, toArrowType(typeInfo), null);
   }
 
-  private ArrowType toArrowType(TypeInfo typeInfo) {
+  private static ArrowType toArrowType(TypeInfo typeInfo) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:
         switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
@@ -218,34 +273,35 @@ class Serializer {
     }
   }
 
-  private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size) {
+  private static void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:
-        writePrimitive(arrowVector, hiveVector, typeInfo, size);
+        writePrimitive(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative);
         break;
       case LIST:
-        writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size);
+        writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
         break;
       case STRUCT:
-        writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size);
+        writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
         break;
       case UNION:
-        writeUnion(arrowVector, hiveVector, typeInfo, size);
+        writeUnion(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative);
         break;
       case MAP:
-        writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size);
+        writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
         break;
       default:
         throw new IllegalArgumentException();
-    }
+      }
   }
 
-  private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo,
-      int size) {
+  private static void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo,
+      int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
     final ListColumnVector structListVector = toStructListVector(hiveVector);
 
-    write(arrowVector, structListVector, structListTypeInfo, size);
+    write(arrowVector, structListVector, structListTypeInfo, size, vectorizedRowBatch, isNative);
 
     final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
     for (int rowIndex = 0; rowIndex < size; rowIndex++) {
@@ -257,8 +313,8 @@ class Serializer {
     }
   }
 
-  private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
-      int size) {
+  private static void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
+      int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
     final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
     final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
@@ -268,11 +324,11 @@ class Serializer {
     final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
     final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
 
-    write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+    write(arrowVector, hiveObjectVector, objectTypeInfo, size, vectorizedRowBatch, isNative);
   }
 
-  private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector,
-      StructTypeInfo typeInfo, int size) {
+  private static void writeStruct(MapVector arrowVector, StructColumnVector hiveVector,
+      StructTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final List<String> fieldNames = typeInfo.getAllStructFieldNames();
     final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
     final ColumnVector[] hiveFieldVectors = hiveVector.fields;
@@ -287,7 +343,7 @@ class Serializer {
               toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
       arrowFieldVector.setInitialCapacity(size);
       arrowFieldVector.allocateNew();
-      write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+      write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size, vectorizedRowBatch, isNative);
     }
 
     final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
@@ -300,8 +356,8 @@ class Serializer {
     }
   }
 
-  private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo,
-      int size) {
+  private static void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final int OFFSET_WIDTH = 4;
     final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
     final ColumnVector hiveElementVector = hiveVector.child;
@@ -310,7 +366,7 @@ class Serializer {
     arrowElementVector.setInitialCapacity(hiveVector.childCount);
     arrowElementVector.allocateNew();
 
-    write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount);
+    write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative);
 
     final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
     int nextOffset = 0;
@@ -327,208 +383,244 @@ class Serializer {
     offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
   }
 
-  private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
-      int size) {
+  //Handle cases for both internally constructed
+  //and externally provided (isNative) VectorRowBatch
+  private static void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
         ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
     switch (primitiveCategory) {
-      case BOOLEAN:
-        {
-          final BitVector bitVector = (BitVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              bitVector.setNull(i);
-            } else {
-              bitVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+    case BOOLEAN:
+    {
+      if(isNative) {
+      writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter, boolValueSetter);
+        return;
+      }
+      final BitVector bitVector = (BitVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          boolNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          boolValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case BYTE:
-        {
-          final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              tinyIntVector.setNull(i);
-            } else {
-              tinyIntVector.set(i, (byte) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case BYTE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter, byteValueSetter);
+        return;
+      }
+      final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          byteNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          byteValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case SHORT:
-        {
-          final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              smallIntVector.setNull(i);
-            } else {
-              smallIntVector.set(i, (short) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case SHORT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter, shortValueSetter);
+        return;
+      }
+      final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          shortNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          shortValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case INT:
-        {
-          final IntVector intVector = (IntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intVector.setNull(i);
-            } else {
-              intVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case INT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter, intValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case LONG:
-        {
-          final BigIntVector bigIntVector = (BigIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              bigIntVector.setNull(i);
-            } else {
-              bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case LONG:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter, longValueSetter);
+        return;
+      }
+      final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          longNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          longValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case FLOAT:
-        {
-          final Float4Vector float4Vector = (Float4Vector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              float4Vector.setNull(i);
-            } else {
-              float4Vector.set(i, (float) ((DoubleColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case FLOAT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter, floatValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          floatNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          floatValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case DOUBLE:
-        {
-          final Float8Vector float8Vector = (Float8Vector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              float8Vector.setNull(i);
-            } else {
-              float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case DOUBLE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, doubleNullSetter, doubleValueSetter);
+        return;
+      }
+      final Float8Vector float8Vector = (Float8Vector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          doubleNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          doubleValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case STRING:
-      case VARCHAR:
-      case CHAR:
-        {
-          final VarCharVector varCharVector = (VarCharVector) arrowVector;
-          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              varCharVector.setNull(i);
-            } else {
-              varCharVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
-            }
-          }
+      }
+    }
+    break;
+    //TODO Add CHAR padding conversion
+    case STRING:
+    case VARCHAR:
+    case CHAR:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, stringNullSetter, stringValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          stringNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          stringValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case DATE:
-        {
-          final DateDayVector dateDayVector = (DateDayVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              dateDayVector.setNull(i);
-            } else {
-              dateDayVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case DATE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, dateValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          dateNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          dateValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case TIMESTAMP:
-        {
-          final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector;
-          final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              timeStampMicroTZVector.setNull(i);
-            } else {
-              // Time = second + sub-second
-              final long secondInMillis = timestampColumnVector.getTime(i);
-              final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
-              final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS;
-
-              if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) {
-                // If the timestamp cannot be represented in long microsecond, set it as a null value
-                timeStampMicroTZVector.setNull(i);
-              } else {
-                timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
-              }
-            }
-          }
+      }
+    }
+    break;
+    case TIMESTAMP:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, timestampNullSetter, timestampValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          timestampNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          timestampValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case BINARY:
-        {
-          final VarBinaryVector varBinaryVector = (VarBinaryVector) arrowVector;
-          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              varBinaryVector.setNull(i);
-            } else {
-              varBinaryVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
-            }
-          }
+      }
+    }
+    break;
+    case BINARY:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, binaryNullSetter, binaryValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          binaryNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          binaryValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case DECIMAL:
-        {
-          final DecimalVector decimalVector = (DecimalVector) arrowVector;
-          final int scale = decimalVector.getScale();
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              decimalVector.setNull(i);
-            } else {
-              decimalVector.set(i,
-                  ((DecimalColumnVector) hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
-            }
-          }
+      }
+    }
+    break;
+    case DECIMAL:
+    {
+      if(isNative) {
+        if(hiveVector instanceof DecimalColumnVector) {
+          writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimalValueSetter);
+        } else {
+          writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimal64ValueSetter);
         }
-        break;
-      case INTERVAL_YEAR_MONTH:
-        {
-          final IntervalYearVector intervalYearVector = (IntervalYearVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intervalYearVector.setNull(i);
-            } else {
-              intervalYearVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          decimalNullSetter.accept(i, arrowVector, hiveVector);
+        } else if(hiveVector instanceof DecimalColumnVector) {
+          decimalValueSetter.accept(i, i, arrowVector, hiveVector);
+        } else if(hiveVector instanceof Decimal64ColumnVector) {
+          decimal64ValueSetter.accept(i, i, arrowVector, hiveVector);
+        } else {
+          throw new IllegalArgumentException("Unsupported vector column type: " + hiveVector.getClass().getName());
         }
-        break;
-      case INTERVAL_DAY_TIME:
-        {
-          final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
-          final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
-              (IntervalDayTimeColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intervalDayVector.setNull(i);
-            } else {
-              final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i);
-              final long days = totalSeconds / SECOND_PER_DAY;
-              final long millis =
-                  (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
-                      intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS;
-              intervalDayVector.set(i, (int) days, (int) millis);
-            }
-          }
+      }
+    }
+    break;
+    case INTERVAL_YEAR_MONTH:
+    {
+      if(isNative) {
+       writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalYearMonthNullSetter, intervalYearMonthValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector);
         }
-        break;
-      case VOID:
-      case UNKNOWN:
-      case TIMESTAMPLOCALTZ:
-      default:
-        throw new IllegalArgumentException();
+      }
+    }
+    break;
+    case INTERVAL_DAY_TIME:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalDayTimeNullSetter, intervalDayTimeValueSetter);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector);
+        }
+      }
+    }
+    break;
+    case VOID:
+    case UNKNOWN:
+    case TIMESTAMPLOCALTZ:
+    default:
+      throw new IllegalArgumentException();
     }
   }
 
@@ -536,7 +628,7 @@ class Serializer {
     // if row is null, it means there are no more rows (closeOp()).
     // another case can be that the buffer is full.
     if (obj == null) {
-      return serializeBatch();
+      return serializeBatch(vectorizedRowBatch, false);
     }
     List<Object> standardObjects = new ArrayList<Object>();
     ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
@@ -545,8 +637,218 @@ class Serializer {
     vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize);
     batchSize++;
     if (batchSize == MAX_BUFFERED_ROWS) {
-      return serializeBatch();
+      return serializeBatch(vectorizedRowBatch, false);
     }
     return null;
   }
+
+ //Use a provided nullSetter and valueSetter function to populate
+ //fieldVector from hiveVector
+ private static void writeGeneric(final FieldVector fieldVector, final ColumnVector hiveVector, final int size, final boolean selectedInUse, final int[] selected, final IntAndVectorsConsumer nullSetter, final IntIntAndVectorsConsumer valueSetter)
+  {
+     final boolean[] inputIsNull = hiveVector.isNull;
+     final int[] sel = selected;
+
+     if (hiveVector.isRepeating) {
+       if (hiveVector.noNulls || !inputIsNull[0]) {
+         for(int i = 0; i < size; i++) {
+           //Fill n rows with value in row 0
+           valueSetter.accept(i, 0, fieldVector, hiveVector);
+         }
+       } else {
+         for(int i = 0; i < size; i++) {
+           //Fill n rows with NULL
+           nullSetter.accept(i, fieldVector, hiveVector);
+         }
+       }
+       return;
+     }
+
+     if (hiveVector.noNulls) {
+       if (selectedInUse) {
+         for(int logical = 0; logical < size; logical++) {
+           final int batchIndex = sel[logical];
+           //Add row batchIndex
+           valueSetter.accept(logical, batchIndex, fieldVector, hiveVector);
+         }
+       } else {
+         for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+           //Add row batchIndex
+           valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector);
+         }
+       }
+     } else {
+       if (selectedInUse) {
+         for(int logical = 0; logical < size; logical++) {
+           final int batchIndex = sel[logical];
+           if (inputIsNull[batchIndex]) {
+             //Add NULL
+             nullSetter.accept(batchIndex, fieldVector, hiveVector);
+           } else {
+             //Add row batchIndex
+             valueSetter.accept(logical, batchIndex, fieldVector, hiveVector);
+          }
+        }
+       } else {
+         for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+           if (inputIsNull[batchIndex]) {
+             //Add NULL
+             nullSetter.accept(batchIndex, fieldVector, hiveVector);
+           } else {
+             //Add row batchIndex
+             valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector);
+         }
+       }
+     }
+   }
+  }
+
+  //nullSetters and valueSetter for each type
+
+  //bool
+  private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector, hiveVector)
+      -> ((BitVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer boolValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+  //byte
+  private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector, hiveVector)
+      -> ((TinyIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer byteValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector) hiveVector).vector[j]);
+
+  //short
+  private static final IntAndVectorsConsumer shortNullSetter = (i, arrowVector, hiveVector)
+      -> ((SmallIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer shortValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector) hiveVector).vector[j]);
+
+  //int
+  private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector, hiveVector)
+      -> ((IntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer intValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+  //long
+  private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector, hiveVector)
+      -> ((BigIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer longValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector) hiveVector).vector[j]);
+
+  //float
+  private static final IntAndVectorsConsumer floatNullSetter = (i, arrowVector, hiveVector)
+      -> ((Float4Vector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer floatValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector) hiveVector).vector[j]);
+
+  //double
+  private static final IntAndVectorsConsumer doubleNullSetter = (i, arrowVector, hiveVector)
+      -> ((Float8Vector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector) hiveVector).vector[j]);
+
+  //string/varchar
+  private static final IntAndVectorsConsumer stringNullSetter = (i, arrowVector, hiveVector)
+      -> ((VarCharVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer stringValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+  };
+
+  //fixed-length CHAR
+  //TODO Add padding conversion
+  private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector, hiveVector)
+      -> ((VarCharVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer charValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+  };
+
+  //date
+  private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, hiveVector)
+      -> ((DateDayVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+  //timestamp
+  private static final IntAndVectorsConsumer timestampNullSetter = (i, arrowVector, hiveVector)
+      -> ((TimeStampMicroTZVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector;
+    final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
+    // Time = second + sub-second
+    final long secondInMillis = timestampColumnVector.getTime(j);
+    final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
+    final long subSecondInMicros = timestampColumnVector.getNanos(j) / NS_PER_MICROS;
+    if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) {
+      // If the timestamp cannot be represented in long microsecond, set it as a null value
+      timeStampMicroTZVector.setNull(i);
+    } else {
+      timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
+    }
+  };
+
+  //binary
+  private static final IntAndVectorsConsumer binaryNullSetter = (i, arrowVector, hiveVector)
+      -> ((VarBinaryVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+  };
+
+  //decimal and decimal64
+  private static final IntAndVectorsConsumer decimalNullSetter = (i, arrowVector, hiveVector)
+      -> ((DecimalVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    final DecimalVector decimalVector = (DecimalVector) arrowVector;
+    final int scale = decimalVector.getScale();
+    decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale));
+  };
+  private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    final DecimalVector decimalVector = (DecimalVector) arrowVector;
+    final int scale = decimalVector.getScale();
+    HiveDecimalWritable decimalHolder = new HiveDecimalWritable();
+    decimalHolder.setFromLongAndScale(((Decimal64ColumnVector) hiveVector).vector[j], scale);
+    decimalVector.set(i, decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale));
+  };
+
+  //interval year
+  private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i, arrowVector, hiveVector)
+      -> ((IntervalYearVector) arrowVector).setNull(i);
+  private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i, j, arrowVector, hiveVector)
+      -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+  //interval day
+  private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i, arrowVector, hiveVector)
+      -> ((IntervalDayVector) arrowVector).setNull(i);
+  private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j, arrowVector, hiveVector)
+      -> {
+    final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
+    final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+        (IntervalDayTimeColumnVector) hiveVector;
+    long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j);
+    final long days = totalSeconds / SECOND_PER_DAY;
+    final long millis =
+        (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
+            intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS;
+    intervalDayVector.set(i, (int) days, (int) millis);
+  };
+
+  //Used for setting null at arrowVector[i]
+  private interface IntAndVectorsConsumer {
+    void accept(int i, FieldVector arrowVector, ColumnVector hiveVector);
+  }
+
+  //Used to copy value from hiveVector[j] -> arrowVector[i]
+  //since hiveVector might be referenced through vector.selected
+  private interface IntIntAndVectorsConsumer {
+    void accept(int i, int j, FieldVector arrowVector, ColumnVector hiveVector);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index eb5b1a8..9bb104d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
@@ -4119,6 +4120,48 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
+  private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc,
+      boolean isTezOrSpark, VectorizationContext vContext,
+      VectorFileSinkDesc vectorDesc) throws HiveException {
+
+    // Various restrictions.
+
+    boolean isVectorizationFileSinkArrowNativeEnabled =
+        HiveConf.getBoolVar(hiveConf,
+            HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED);
+
+    String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+    String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName();
+
+    boolean isOkArrowFileSink =
+        serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe") &&
+        isVectorizationFileSinkArrowNativeEnabled &&
+        engine.equalsIgnoreCase("tez");
+
+    return isOkArrowFileSink;
+  }
+
+  private Operator<? extends OperatorDesc> specializeArrowFileSinkOperator(
+      Operator<? extends OperatorDesc> op, VectorizationContext vContext, FileSinkDesc desc,
+      VectorFileSinkDesc vectorDesc) throws HiveException {
+
+    Class<? extends Operator<?>> opClass = VectorFileSinkArrowOperator.class;
+
+    Operator<? extends OperatorDesc> vectorOp = null;
+    try {
+      vectorOp = OperatorFactory.getVectorOperator(
+          opClass, op.getCompilationOpContext(), op.getConf(),
+          vContext, vectorDesc);
+    } catch (Exception e) {
+      LOG.info("Vectorizer vectorizeOperator file sink class exception " + opClass.getSimpleName() +
+          " exception " + e);
+      throw new HiveException(e);
+    }
+
+    return vectorOp;
+  }
+
   private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) {
     if (vecExpr == null) {
       return false;
@@ -5145,9 +5188,20 @@ public class Vectorizer implements PhysicalPlanResolver {
             FileSinkDesc fileSinkDesc = (FileSinkDesc) op.getConf();
 
             VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc();
-            vectorOp = OperatorFactory.getVectorOperator(
-                op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc);
-            isNative = false;
+            boolean isArrowSpecialization =
+                checkForArrowFileSink(fileSinkDesc, isTezOrSpark, vContext, vectorFileSinkDesc);
+
+            if (isArrowSpecialization) {
+              vectorOp =
+                  specializeArrowFileSinkOperator(
+                      op, vContext, fileSinkDesc, vectorFileSinkDesc);
+              isNative = true;
+            } else {
+              vectorOp =
+                  OperatorFactory.getVectorOperator(
+                      op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc);
+              isNative = false;
+            }
           }
           break;
         case LIMIT: