You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/06 02:50:57 UTC

[51/53] [abbrv] hive git commit: HIVE-11474 : LLAP: merge master into branch (Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 333f861,a9d1f8e..3450a26
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@@ -470,4 -472,21 +472,21 @@@ public class GenTezUtils 
  
      curr.removeChild(child);
    }
+ 
+   public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork followingWork) {
+     if (followingWork instanceof ReduceWork) {
+       // Ideally there should be a better way to determine that the followingWork contains
+       // a dynamic partitioned hash join, but in some cases (createReduceWork()) it looks like
+       // the work must be created/connected first, before the GenTezProcContext can be updated
+       // with the mapjoin/work relationship.
+       ReduceWork reduceWork = (ReduceWork) followingWork;
+       if (reduceWork.getReducer() instanceof MapJoinOperator) {
+         MapJoinOperator joinOp = (MapJoinOperator) reduceWork.getReducer();
+         if (joinOp.getConf().isDynamicPartitionHashJoin()) {
+           return EdgeType.CUSTOM_SIMPLE_EDGE;
+         }
+       }
+     }
+     return EdgeType.SIMPLE_EDGE;
+   }
 -}
 +}

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index 0000000,20432c7..cd1301d
mode 000000,100644..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@@ -1,0 -1,142 +1,141 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.parse.spark;
+ 
+ import java.io.BufferedOutputStream;
+ import java.io.IOException;
+ import java.io.ObjectOutputStream;
+ import java.util.Collection;
+ import java.util.concurrent.Future;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.ql.exec.Operator;
+ import org.apache.hadoop.hive.ql.exec.Utilities;
+ import org.apache.hadoop.hive.ql.metadata.HiveException;
+ import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+ import org.apache.hadoop.io.DataOutputBuffer;
+ import org.apache.hadoop.io.Writable;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.apache.hadoop.hive.serde2.Serializer;
+ 
+ /**
+  * This operator gets partition info from the upstream operators, and write them
+  * to HDFS. This will later be read at the driver, and used for pruning the partitions
+  * for the big table side.
+  */
+ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPruningSinkDesc> {
+ 
+   @SuppressWarnings("deprecation")
+   protected transient Serializer serializer;
+   protected transient DataOutputBuffer buffer;
+   protected static final Log LOG = LogFactory.getLog(SparkPartitionPruningSinkOperator.class);
+ 
+   @SuppressWarnings("deprecation")
 -  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
 -    Collection<Future<?>> result = super.initializeOp(hconf);
++  public void initializeOp(Configuration hconf) throws HiveException {
++    super.initializeOp(hconf);
+     serializer = (Serializer) ReflectionUtils.newInstance(
+         conf.getTable().getDeserializerClass(), null);
+     buffer = new DataOutputBuffer();
 -    return result;
+   }
+ 
+   @Override
+   public void process(Object row, int tag) throws HiveException {
+     ObjectInspector rowInspector = inputObjInspectors[0];
+     try {
+       Writable writableRow = serializer.serialize(row, rowInspector);
+       writableRow.write(buffer);
+     } catch (Exception e) {
+       throw new HiveException(e);
+     }
+   }
+ 
+   @Override
+   public void closeOp(boolean abort) throws HiveException {
+     if (!abort) {
+       try {
+         flushToFile();
+       } catch (Exception e) {
+         throw new HiveException(e);
+       }
+     }
+   }
+ 
+   private void flushToFile() throws IOException {
+     // write an intermediate file to the specified path
+     // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt
+     Path path = conf.getPath();
+     FileSystem fs = path.getFileSystem(this.getConfiguration());
+     fs.mkdirs(path);
+ 
+     while (true) {
+       path = new Path(path, String.valueOf(Utilities.randGen.nextInt()));
+       if (!fs.exists(path)) {
+         break;
+       }
+     }
+ 
+     short numOfRepl = fs.getDefaultReplication(path);
+ 
+     ObjectOutputStream out = null;
+     FSDataOutputStream fsout = null;
+ 
+     try {
+       fsout = fs.create(path, numOfRepl);
+       out = new ObjectOutputStream(new BufferedOutputStream(fsout, 4096));
+       out.writeUTF(conf.getTargetColumnName());
+       buffer.writeTo(out);
+     } catch (Exception e) {
+       try {
+         fs.delete(path, false);
+       } catch (Exception ex) {
+         LOG.warn("Exception happened while trying to clean partial file.");
+       }
+       throw e;
+     } finally {
+       if (out != null) {
+         LOG.info("Flushed to file: " + path);
+         out.close();
+       } else if (fsout != null) {
+         fsout.close();
+       }
+     }
+   }
+ 
+   @Override
+   public OperatorType getType() {
+     return OperatorType.SPARKPRUNINGSINK;
+   }
+ 
+   @Override
+   public String getName() {
+     return getOperatorName();
+   }
+ 
+   public static String getOperatorName() {
+     return "SPARKPRUNINGSINK";
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
index dad5536,4469353..0626c49
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@@ -99,81 -99,7 +99,81 @@@ public class TestIOContextMap 
    }
  
    @Test
 -  public void testSparkThreadLocal() throws Exception {
 +  public void testTezLlapAttemptMap() throws Exception {
 +    // Tests that different threads get the same object per attempt per input, and different
 +    // between attempts/inputs; that attempt is inherited between threads; and that clearing
 +    // the attempt produces a different result.
 +    final int THREAD_COUNT = 2, ITER_COUNT = 1000, ATTEMPT_COUNT = 3;
 +    final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
 +    final IOContext[] results = new IOContext[ITER_COUNT * ATTEMPT_COUNT];
 +    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
 +    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
 +
 +    @SuppressWarnings("unchecked")
 +    FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
 +    for (int i = 0; i < tasks.length; ++i) {
 +      tasks[i] = new FutureTask<Void>(new Callable<Void>() {
 +        public Void call() throws Exception {
 +          final Configuration conf = new Configuration(), conf2 = new Configuration();
 +          syncThreadStart(cdlIn, cdlOut);
 +          while (true) {
 +            int nextIx = countdown.decrementAndGet();
 +            if (nextIx < 0) break;
 +            String input1 = "Input " + nextIx;
 +            conf.set(Utilities.INPUT_NAME, input1);
 +            for (int j = 0; j < ATTEMPT_COUNT; ++j) {
 +              String attemptId = "Attempt " + nextIx + ":" + j;
 +              IOContextMap.setThreadAttemptId(attemptId);
 +              final IOContext r1 = results[(nextIx * ATTEMPT_COUNT) + j] = IOContextMap.get(conf);
 +              // For some attempts, check inheritance.
 +              if ((nextIx % (ITER_COUNT / 10)) == 0) {
 +                String input2 = "Input2 " + nextIx;
 +                conf2.set(Utilities.INPUT_NAME, input2);
 +                final AtomicReference<IOContext> ref2 = new AtomicReference<>();
 +                Thread t = new Thread(new Runnable() {
 +                  public void run() {
 +                    assertSame(r1, IOContextMap.get(conf));
 +                    ref2.set(IOContextMap.get(conf2));
 +                  }
 +                });
 +                t.start();
 +                t.join();
 +                assertSame(ref2.get(), IOContextMap.get(conf2));
 +              }
 +              // Don't clear the attempt ID, or the stuff will be cleared.
 +            }
 +            if (nextIx == 0) break;
 +          }
 +          return null;
 +        }
 +      });
 +      executor.execute(tasks[i]);
 +    }
 +
 +    cdlIn.await(); // Wait for all threads to be ready.
 +    cdlOut.countDown(); // Release them at the same time.
 +    for (int i = 0; i < tasks.length; ++i) {
 +      tasks[i].get();
 +    }
 +    Configuration conf = new Configuration();
 +    Set<IOContext> resultSet = Sets.newIdentityHashSet();
 +    for (int i = 0; i < ITER_COUNT; ++i) {
 +      conf.set(Utilities.INPUT_NAME, "Input " + i);
 +      for (int j = 0; j < ATTEMPT_COUNT; ++j) {
 +        String attemptId = "Attempt " + i + ":" + j;
 +        IOContext result = results[(i * ATTEMPT_COUNT) + j];
 +        assertTrue(resultSet.add(result)); // All the objects must be different.
 +        IOContextMap.setThreadAttemptId(attemptId);
 +        assertSame(result, IOContextMap.get(conf)); // Matching result for attemptId + input.
 +        IOContextMap.clearThreadAttempt(attemptId);
 +        IOContextMap.setThreadAttemptId(attemptId);
 +        assertNotSame(result, IOContextMap.get(conf)); // Different result after clearing.
 +      }
 +    }
 +  }
 +
 +  @Test
-   public void testSparkThreadLocal() throws Exception {
++    public void testSparkThreadLocal() throws Exception {
      // Test that input name does not change IOContext returned, and that each thread gets its own.
      final Configuration conf1 = new Configuration();
      conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index c667732,6cb8529..ad3199b
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@@ -59,8 -63,8 +63,9 @@@ import org.apache.hadoop.hive.ql.exec.v
  import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
  import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 +import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  import org.apache.hadoop.hive.ql.io.HiveInputFormat;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@@ -927,8 -939,8 +940,8 @@@ public class TestInputOutputFormat 
      OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
      OrcInputFormat.SplitGenerator splitter =
          new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
 -            fs.getFileStatus(new Path("/a/file")), null, true,
 +            AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
-             new ArrayList<Long>(), true, null, null));
+             new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
      OrcSplit result = splitter.createSplit(0, 200, null);
      assertEquals(0, result.getStart());
      assertEquals(200, result.getLength());
@@@ -968,8 -980,8 +981,8 @@@
      OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
      OrcInputFormat.SplitGenerator splitter =
          new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
 -            fs.getFileStatus(new Path("/a/file")), null, true,
 +            AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
-             new ArrayList<Long>(), true, null, null));
+             new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
      List<OrcSplit> results = splitter.call();
      OrcSplit result = results.get(0);
      assertEquals(3, result.getStart());
@@@ -991,8 -1003,8 +1004,8 @@@
      conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
      context = new OrcInputFormat.Context(conf);
      splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
 -      fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
 -        true, null, null));
 +      AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
-         new ArrayList<Long>(), true, null, null));
++        new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
      results = splitter.call();
      for(int i=0; i < stripeSizes.length; ++i) {
        assertEquals("checking stripe " + i + " size",

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/list_bucket_dml_4.q.java1.8.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/list_bucket_dml_6.q.java1.8.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/list_bucket_dml_9.q.java1.8.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vector_string_concat.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vectorization_17.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vectorization_div0.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization2.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/test/results/clientpositive/tez/vectorization_17.q.out
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --cc storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 0000000,02c52fa..46c25a2
mode 000000,100644..100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@@ -1,0 -1,322 +1,331 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
++
+ /**
+  * This class supports string and binary data by value reference -- i.e. each field is
+  * explicitly present, as opposed to provided by a dictionary reference.
+  * In some cases, all the values will be in the same byte array to begin with,
+  * but this need not be the case. If each value is in a separate byte
+  * array to start with, or not all of the values are in the same original
+  * byte array, you can still assign data by reference into this column vector.
+  * This gives flexibility to use this in multiple situations.
+  * <p>
+  * When setting data by reference, the caller
+  * is responsible for allocating the byte arrays used to hold the data.
+  * You can also set data by value, as long as you call the initBuffer() method first.
+  * You can mix "by value" and "by reference" in the same column vector,
+  * though that use is probably not typical.
+  */
+ public class BytesColumnVector extends ColumnVector {
+   public byte[][] vector;
+   public int[] start;          // start offset of each field
+ 
+   /*
+    * The length of each field. If the value repeats for every entry, then it is stored
+    * in vector[0] and isRepeating from the superclass is set to true.
+    */
+   public int[] length;
+   private byte[] buffer;   // optional buffer to use when actually copying in data
+   private int nextFree;    // next free position in buffer
+ 
+   // Estimate that there will be 16 bytes per entry
+   static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
+ 
+   // Proportion of extra space to provide when allocating more buffer space.
+   static final float EXTRA_SPACE_FACTOR = (float) 1.2;
+ 
+   /**
+    * Use this constructor for normal operation.
+    * All column vectors should be the default size normally.
+    */
+   public BytesColumnVector() {
+     this(VectorizedRowBatch.DEFAULT_SIZE);
+   }
+ 
+   /**
+    * Don't call this constructor except for testing purposes.
+    *
+    * @param size  number of elements in the column vector
+    */
+   public BytesColumnVector(int size) {
+     super(size);
+     vector = new byte[size][];
+     start = new int[size];
+     length = new int[size];
+   }
+ 
+   /**
+    * Additional reset work for BytesColumnVector (releasing scratch bytes for by value strings).
+    */
+   @Override
+   public void reset() {
+     super.reset();
+     initBuffer(0);
+   }
+ 
+   /** Set a field by reference.
+    *
+    * @param elementNum index within column vector to set
+    * @param sourceBuf container of source data
+    * @param start start byte position within source
+    * @param length  length of source byte sequence
+    */
+   public void setRef(int elementNum, byte[] sourceBuf, int start, int length) {
+     vector[elementNum] = sourceBuf;
+     this.start[elementNum] = start;
+     this.length[elementNum] = length;
+   }
+ 
+   /**
+    * You must call initBuffer first before using setVal().
+    * Provide the estimated number of bytes needed to hold
+    * a full column vector worth of byte string data.
+    *
+    * @param estimatedValueSize  Estimated size of buffer space needed
+    */
+   public void initBuffer(int estimatedValueSize) {
+     nextFree = 0;
+ 
+     // if buffer is already allocated, keep using it, don't re-allocate
+     if (buffer != null) {
+       return;
+     }
+ 
+     // allocate a little extra space to limit need to re-allocate
+     int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
+     if (bufferSize < DEFAULT_BUFFER_SIZE) {
+       bufferSize = DEFAULT_BUFFER_SIZE;
+     }
+     buffer = new byte[bufferSize];
+   }
+ 
+   /**
+    * Initialize buffer to default size.
+    */
+   public void initBuffer() {
+     initBuffer(0);
+   }
+ 
+   /**
+    * @return amount of buffer space currently allocated
+    */
+   public int bufferSize() {
+     if (buffer == null) {
+       return 0;
+     }
+     return buffer.length;
+   }
+ 
+   /**
+    * Set a field by actually copying in to a local buffer.
+    * If you must actually copy data in to the array, use this method.
+    * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+    * Setting data by reference tends to run a lot faster than copying data in.
+    *
+    * @param elementNum index within column vector to set
+    * @param sourceBuf container of source data
+    * @param start start byte position within source
+    * @param length  length of source byte sequence
+    */
+   public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
+     if ((nextFree + length) > buffer.length) {
+       increaseBufferSpace(length);
+     }
+     System.arraycopy(sourceBuf, start, buffer, nextFree, length);
+     vector[elementNum] = buffer;
+     this.start[elementNum] = nextFree;
+     this.length[elementNum] = length;
+     nextFree += length;
+   }
+ 
+   /**
+    * Set a field to the concatenation of two string values. Result data is copied
+    * into the internal buffer.
+    *
+    * @param elementNum index within column vector to set
+    * @param leftSourceBuf container of left argument
+    * @param leftStart start of left argument
+    * @param leftLen length of left argument
+    * @param rightSourceBuf container of right argument
+    * @param rightStart start of right argument
+    * @param rightLen length of right arugment
+    */
+   public void setConcat(int elementNum, byte[] leftSourceBuf, int leftStart, int leftLen,
+       byte[] rightSourceBuf, int rightStart, int rightLen) {
+     int newLen = leftLen + rightLen;
+     if ((nextFree + newLen) > buffer.length) {
+       increaseBufferSpace(newLen);
+     }
+     vector[elementNum] = buffer;
+     this.start[elementNum] = nextFree;
+     this.length[elementNum] = newLen;
+ 
+     System.arraycopy(leftSourceBuf, leftStart, buffer, nextFree, leftLen);
+     nextFree += leftLen;
+     System.arraycopy(rightSourceBuf, rightStart, buffer, nextFree, rightLen);
+     nextFree += rightLen;
+   }
+ 
+   /**
+    * Increase buffer space enough to accommodate next element.
+    * This uses an exponential increase mechanism to rapidly
+    * increase buffer size to enough to hold all data.
+    * As batches get re-loaded, buffer space allocated will quickly
+    * stabilize.
+    *
+    * @param nextElemLength size of next element to be added
+    */
+   public void increaseBufferSpace(int nextElemLength) {
+ 
+     // Keep doubling buffer size until there will be enough space for next element.
+     int newLength = 2 * buffer.length;
+     while((nextFree + nextElemLength) > newLength) {
+       newLength *= 2;
+     }
+ 
+     // Allocate new buffer, copy data to it, and set buffer to new buffer.
+     byte[] newBuffer = new byte[newLength];
+     System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
+     buffer = newBuffer;
+   }
+ 
+   /** Copy the current object contents into the output. Only copy selected entries,
+     * as indicated by selectedInUse and the sel array.
+     */
+   public void copySelected(
+       boolean selectedInUse, int[] sel, int size, BytesColumnVector output) {
+ 
+     // Output has nulls if and only if input has nulls.
+     output.noNulls = noNulls;
+     output.isRepeating = false;
+ 
+     // Handle repeating case
+     if (isRepeating) {
+       output.setVal(0, vector[0], start[0], length[0]);
+       output.isNull[0] = isNull[0];
+       output.isRepeating = true;
+       return;
+     }
+ 
+     // Handle normal case
+ 
+     // Copy data values over
+     if (selectedInUse) {
+       for (int j = 0; j < size; j++) {
+         int i = sel[j];
+         output.setVal(i, vector[i], start[i], length[i]);
+       }
+     }
+     else {
+       for (int i = 0; i < size; i++) {
+         output.setVal(i, vector[i], start[i], length[i]);
+       }
+     }
+ 
+     // Copy nulls over if needed
+     if (!noNulls) {
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           output.isNull[i] = isNull[i];
+         }
+       }
+       else {
+         System.arraycopy(isNull, 0, output.isNull, 0, size);
+       }
+     }
+   }
+ 
+   /** Simplify vector by brute-force flattening noNulls and isRepeating
+     * This can be used to reduce combinatorial explosion of code paths in VectorExpressions
+     * with many arguments, at the expense of loss of some performance.
+     */
+   public void flatten(boolean selectedInUse, int[] sel, int size) {
+     flattenPush();
+     if (isRepeating) {
+       isRepeating = false;
+ 
+       // setRef is used below and this is safe, because the reference
+       // is to data owned by this column vector. If this column vector
+       // gets re-used, the whole thing is re-used together so there
+       // is no danger of a dangling reference.
+ 
+       // Only copy data values if entry is not null. The string value
+       // at position 0 is undefined if the position 0 value is null.
+       if (noNulls || !isNull[0]) {
+ 
+         // loops start at position 1 because position 0 is already set
+         if (selectedInUse) {
+           for (int j = 1; j < size; j++) {
+             int i = sel[j];
+             this.setRef(i, vector[0], start[0], length[0]);
+           }
+         } else {
+           for (int i = 1; i < size; i++) {
+             this.setRef(i, vector[0], start[0], length[0]);
+           }
+         }
+       }
+       flattenRepeatingNulls(selectedInUse, sel, size);
+     }
+     flattenNoNulls(selectedInUse, sel, size);
+   }
+ 
+   // Fill the all the vector entries with provided value
+   public void fill(byte[] value) {
+     noNulls = true;
+     isRepeating = true;
+     setRef(0, value, 0, value.length);
+   }
+ 
++  // Fill the column vector with nulls
++  public void fillWithNulls() {
++    noNulls = false;
++    isRepeating = true;
++    vector[0] = null;
++    isNull[0] = true;
++  }
++
+   @Override
+   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
+     BytesColumnVector in = (BytesColumnVector) inputVector;
+     setVal(outElementNum, in.vector[inputElementNum], in.start[inputElementNum], in.length[inputElementNum]);
+   }
+ 
+   @Override
+   public void init() {
+     initBuffer(0);
+   }
+ 
+   @Override
+   public void stringifyValue(StringBuilder buffer, int row) {
+     if (isRepeating) {
+       row = 0;
+     }
+     if (noNulls || !isNull[row]) {
+       buffer.append('"');
+       buffer.append(new String(this.buffer, start[row], length[row]));
+       buffer.append('"');
+     } else {
+       buffer.append("null");
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --cc storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index 0000000,cb75c2c..a623167
mode 000000,100644..100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@@ -1,0 -1,173 +1,174 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
++import java.io.IOException;
+ import java.util.Arrays;
+ 
+ /**
+  * ColumnVector contains the shared structure for the sub-types,
+  * including NULL information, and whether this vector
+  * repeats, i.e. has all values the same, so only the first
+  * one is set. This is used to accelerate query performance
+  * by handling a whole vector in O(1) time when applicable.
+  *
+  * The fields are public by design since this is a performance-critical
+  * structure that is used in the inner loop of query execution.
+  */
+ public abstract class ColumnVector {
+ 
+   /*
+    * The current kinds of column vectors.
+    */
+   public static enum Type {
+     LONG,
+     DOUBLE,
+     BYTES,
+     DECIMAL
+   }
+ 
+   /*
+    * If hasNulls is true, then this array contains true if the value
+    * is null, otherwise false. The array is always allocated, so a batch can be re-used
+    * later and nulls added.
+    */
+   public boolean[] isNull;
+ 
+   // If the whole column vector has no nulls, this is true, otherwise false.
+   public boolean noNulls;
+ 
+   /*
+    * True if same value repeats for whole column vector.
+    * If so, vector[0] holds the repeating value.
+    */
+   public boolean isRepeating;
+ 
+   // Variables to hold state from before flattening so it can be easily restored.
+   private boolean preFlattenIsRepeating;
+   private boolean preFlattenNoNulls;
+ 
+   /**
+    * Constructor for super-class ColumnVector. This is not called directly,
+    * but used to initialize inherited fields.
+    *
+    * @param len Vector length
+    */
+   public ColumnVector(int len) {
+     isNull = new boolean[len];
+     noNulls = true;
+     isRepeating = false;
+   }
+ 
+   /**
+      * Resets the column to default state
+      *  - fills the isNull array with false
+      *  - sets noNulls to true
+      *  - sets isRepeating to false
+      */
+     public void reset() {
+       if (false == noNulls) {
+         Arrays.fill(isNull, false);
+       }
+       noNulls = true;
+       isRepeating = false;
+     }
+ 
+     abstract public void flatten(boolean selectedInUse, int[] sel, int size);
+ 
+     // Simplify vector by brute-force flattening noNulls if isRepeating
+     // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
+     // with many arguments.
+     public void flattenRepeatingNulls(boolean selectedInUse, int[] sel, int size) {
+ 
+       boolean nullFillValue;
+ 
+       if (noNulls) {
+         nullFillValue = false;
+       } else {
+         nullFillValue = isNull[0];
+       }
+ 
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           isNull[i] = nullFillValue;
+         }
+       } else {
+         Arrays.fill(isNull, 0, size, nullFillValue);
+       }
+ 
+       // all nulls are now explicit
+       noNulls = false;
+     }
+ 
+     public void flattenNoNulls(boolean selectedInUse, int[] sel, int size) {
+       if (noNulls) {
+         noNulls = false;
+         if (selectedInUse) {
+           for (int j = 0; j < size; j++) {
+             int i = sel[j];
+             isNull[i] = false;
+           }
+         } else {
+           Arrays.fill(isNull, 0, size, false);
+         }
+       }
+     }
+ 
+     /**
+      * Restore the state of isRepeating and noNulls to what it was
+      * before flattening. This must only be called just after flattening
+      * and then evaluating a VectorExpression on the column vector.
+      * It is an optimization that allows other operations on the same
+      * column to continue to benefit from the isRepeating and noNulls
+      * indicators.
+      */
+     public void unFlatten() {
+       isRepeating = preFlattenIsRepeating;
+       noNulls = preFlattenNoNulls;
+     }
+ 
+     // Record repeating and no nulls state to be restored later.
+     protected void flattenPush() {
+       preFlattenIsRepeating = isRepeating;
+       preFlattenNoNulls = noNulls;
+     }
+ 
+     /**
+      * Set the element in this column vector from the given input vector.
+      */
+     public abstract void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector);
+ 
+     /**
+      * Initialize the column vector. This method can be overridden by specific column vector types.
+      * Use this method only if the individual type of the column vector is not known, otherwise its
+      * preferable to call specific initialization methods.
+      */
+     public void init() {
+       // Do nothing by default
+     }
+ 
+     /**
+      * Print the value for this column into the given string builder.
+      * @param buffer the buffer to print into
+      * @param row the id of the row to print
+      */
+     public abstract void stringifyValue(StringBuilder buffer,
+                                         int row);
+   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --cc storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index 0000000,74a9d5f..997ac5e
mode 000000,100644..100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@@ -1,0 -1,106 +1,126 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec.vector;
 -
++import java.io.IOException;
+ import java.math.BigInteger;
+ 
++
+ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+ import org.apache.hadoop.hive.common.type.HiveDecimal;
+ 
+ public class DecimalColumnVector extends ColumnVector {
+ 
+   /**
+    * A vector of HiveDecimalWritable objects.
+    *
+    * For high performance and easy access to this low-level structure,
+    * the fields are public by design (as they are in other ColumnVector
+    * types).
+    */
+   public HiveDecimalWritable[] vector;
+   public short scale;
+   public short precision;
+ 
+   public DecimalColumnVector(int precision, int scale) {
+     this(VectorizedRowBatch.DEFAULT_SIZE, precision, scale);
+   }
+ 
+   public DecimalColumnVector(int size, int precision, int scale) {
+     super(size);
+     this.precision = (short) precision;
+     this.scale = (short) scale;
+     vector = new HiveDecimalWritable[size];
+     for (int i = 0; i < size; i++) {
+       vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
+     }
+   }
+ 
++  // Fill the all the vector entries with provided value
++  public void fill(HiveDecimal value) {
++    noNulls = true;
++    isRepeating = true;
++    if (vector[0] == null) {
++      vector[0] = new HiveDecimalWritable(value);
++    } else {
++      vector[0].set(value);
++    }
++  }
++
++  // Fill the column vector with nulls
++  public void fillWithNulls() {
++    noNulls = false;
++    isRepeating = true;
++    vector[0] = null;
++    isNull[0] = true;
++  }
++
+   @Override
+   public void flatten(boolean selectedInUse, int[] sel, int size) {
+     // TODO Auto-generated method stub
+   }
+ 
+   @Override
+   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
+     HiveDecimal hiveDec = ((DecimalColumnVector) inputVector).vector[inputElementNum].getHiveDecimal(precision, scale);
+     if (hiveDec == null) {
+       noNulls = false;
+       isNull[outElementNum] = true;
+     } else {
+       vector[outElementNum].set(hiveDec);
+     }
+   }
+ 
+   @Override
+   public void stringifyValue(StringBuilder buffer, int row) {
+     if (isRepeating) {
+       row = 0;
+     }
+     if (noNulls || !isNull[row]) {
+       buffer.append(vector[row].toString());
+     } else {
+       buffer.append("null");
+     }
+   }
+ 
+   public void set(int elementNum, HiveDecimalWritable writeable) {
+     HiveDecimal hiveDec = writeable.getHiveDecimal(precision, scale);
+     if (hiveDec == null) {
+       noNulls = false;
+       isNull[elementNum] = true;
+     } else {
+       vector[elementNum].set(hiveDec);
+     }
+   }
+ 
+   public void set(int elementNum, HiveDecimal hiveDec) {
+     HiveDecimal checkedDec = HiveDecimal.enforcePrecisionScale(hiveDec, precision, scale);
+     if (checkedDec == null) {
+       noNulls = false;
+       isNull[elementNum] = true;
+     } else {
+       vector[elementNum].set(checkedDec);
+     }
+   }
+ 
+   public void setNullDataValue(int elementNum) {
+     // E.g. For scale 2 the minimum is "0.01"
+     HiveDecimal minimumNonZeroValue = HiveDecimal.create(BigInteger.ONE, scale);
+     vector[elementNum].set(minimumNonZeroValue);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --cc storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index 0000000,4a7811d..1453301
mode 000000,100644..100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@@ -1,0 -1,143 +1,152 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
++import java.io.IOException;
+ import java.util.Arrays;
+ 
+ /**
+  * This class represents a nullable double precision floating point column vector.
+  * This class will be used for operations on all floating point types (float, double)
+  * and as such will use a 64-bit double value to hold the biggest possible value.
+  * During copy-in/copy-out, smaller types (i.e. float) will be converted as needed. This will
+  * reduce the amount of code that needs to be generated and also will run fast since the
+  * machine operates with 64-bit words.
+  *
+  * The vector[] field is public by design for high-performance access in the inner
+  * loop of query execution.
+  */
+ public class DoubleColumnVector extends ColumnVector {
+   public double[] vector;
+   public static final double NULL_VALUE = Double.NaN;
+ 
+   /**
+    * Use this constructor by default. All column vectors
+    * should normally be the default size.
+    */
+   public DoubleColumnVector() {
+     this(VectorizedRowBatch.DEFAULT_SIZE);
+   }
+ 
+   /**
+    * Don't use this except for testing purposes.
+    *
+    * @param len
+    */
+   public DoubleColumnVector(int len) {
+     super(len);
+     vector = new double[len];
+   }
+ 
+   // Copy the current object contents into the output. Only copy selected entries,
+   // as indicated by selectedInUse and the sel array.
+   public void copySelected(
+       boolean selectedInUse, int[] sel, int size, DoubleColumnVector output) {
+ 
+     // Output has nulls if and only if input has nulls.
+     output.noNulls = noNulls;
+     output.isRepeating = false;
+ 
+     // Handle repeating case
+     if (isRepeating) {
+       output.vector[0] = vector[0];
+       output.isNull[0] = isNull[0];
+       output.isRepeating = true;
+       return;
+     }
+ 
+     // Handle normal case
+ 
+     // Copy data values over
+     if (selectedInUse) {
+       for (int j = 0; j < size; j++) {
+         int i = sel[j];
+         output.vector[i] = vector[i];
+       }
+     }
+     else {
+       System.arraycopy(vector, 0, output.vector, 0, size);
+     }
+ 
+     // Copy nulls over if needed
+     if (!noNulls) {
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           output.isNull[i] = isNull[i];
+         }
+       }
+       else {
+         System.arraycopy(isNull, 0, output.isNull, 0, size);
+       }
+     }
+   }
+ 
+   // Fill the column vector with the provided value
+   public void fill(double value) {
+     noNulls = true;
+     isRepeating = true;
+     vector[0] = value;
+   }
+ 
++  // Fill the column vector with nulls
++  public void fillWithNulls() {
++    noNulls = false;
++    isRepeating = true;
++    vector[0] = NULL_VALUE;
++    isNull[0] = true;
++  }
++
+   // Simplify vector by brute-force flattening noNulls and isRepeating
+   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
+   // with many arguments.
+   public void flatten(boolean selectedInUse, int[] sel, int size) {
+     flattenPush();
+     if (isRepeating) {
+       isRepeating = false;
+       double repeatVal = vector[0];
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           vector[i] = repeatVal;
+         }
+       } else {
+         Arrays.fill(vector, 0, size, repeatVal);
+       }
+       flattenRepeatingNulls(selectedInUse, sel, size);
+     }
+     flattenNoNulls(selectedInUse, sel, size);
+   }
+ 
+   @Override
+   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
+     vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum];
+   }
+ 
+   @Override
+   public void stringifyValue(StringBuilder buffer, int row) {
+     if (isRepeating) {
+       row = 0;
+     }
+     if (noNulls || !isNull[row]) {
+       buffer.append(vector[row]);
+     } else {
+       buffer.append("null");
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --cc storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index 0000000,5702584..e9183b2
mode 000000,100644..100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@@ -1,0 -1,189 +1,198 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
++import java.io.IOException;
+ import java.util.Arrays;
+ 
+ /**
+  * This class represents a nullable int column vector.
+  * This class will be used for operations on all integer types (tinyint, smallint, int, bigint)
+  * and as such will use a 64-bit long value to hold the biggest possible value.
+  * During copy-in/copy-out, smaller int types will be converted as needed. This will
+  * reduce the amount of code that needs to be generated and also will run fast since the
+  * machine operates with 64-bit words.
+  *
+  * The vector[] field is public by design for high-performance access in the inner
+  * loop of query execution.
+  */
+ public class LongColumnVector extends ColumnVector {
+   public long[] vector;
+   public static final long NULL_VALUE = 1;
+ 
+   /**
+    * Use this constructor by default. All column vectors
+    * should normally be the default size.
+    */
+   public LongColumnVector() {
+     this(VectorizedRowBatch.DEFAULT_SIZE);
+   }
+ 
+   /**
+    * Don't use this except for testing purposes.
+    *
+    * @param len the number of rows
+    */
+   public LongColumnVector(int len) {
+     super(len);
+     vector = new long[len];
+   }
+ 
+   // Copy the current object contents into the output. Only copy selected entries,
+   // as indicated by selectedInUse and the sel array.
+   public void copySelected(
+       boolean selectedInUse, int[] sel, int size, LongColumnVector output) {
+ 
+     // Output has nulls if and only if input has nulls.
+     output.noNulls = noNulls;
+     output.isRepeating = false;
+ 
+     // Handle repeating case
+     if (isRepeating) {
+       output.vector[0] = vector[0];
+       output.isNull[0] = isNull[0];
+       output.isRepeating = true;
+       return;
+     }
+ 
+     // Handle normal case
+ 
+     // Copy data values over
+     if (selectedInUse) {
+       for (int j = 0; j < size; j++) {
+         int i = sel[j];
+         output.vector[i] = vector[i];
+       }
+     }
+     else {
+       System.arraycopy(vector, 0, output.vector, 0, size);
+     }
+ 
+     // Copy nulls over if needed
+     if (!noNulls) {
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           output.isNull[i] = isNull[i];
+         }
+       }
+       else {
+         System.arraycopy(isNull, 0, output.isNull, 0, size);
+       }
+     }
+   }
+ 
+   // Copy the current object contents into the output. Only copy selected entries,
+   // as indicated by selectedInUse and the sel array.
+   public void copySelected(
+       boolean selectedInUse, int[] sel, int size, DoubleColumnVector output) {
+ 
+     // Output has nulls if and only if input has nulls.
+     output.noNulls = noNulls;
+     output.isRepeating = false;
+ 
+     // Handle repeating case
+     if (isRepeating) {
+       output.vector[0] = vector[0];  // automatic conversion to double is done here
+       output.isNull[0] = isNull[0];
+       output.isRepeating = true;
+       return;
+     }
+ 
+     // Handle normal case
+ 
+     // Copy data values over
+     if (selectedInUse) {
+       for (int j = 0; j < size; j++) {
+         int i = sel[j];
+         output.vector[i] = vector[i];
+       }
+     }
+     else {
+       for(int i = 0; i < size; ++i) {
+         output.vector[i] = vector[i];
+       }
+     }
+ 
+     // Copy nulls over if needed
+     if (!noNulls) {
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           output.isNull[i] = isNull[i];
+         }
+       }
+       else {
+         System.arraycopy(isNull, 0, output.isNull, 0, size);
+       }
+     }
+   }
+ 
+   // Fill the column vector with the provided value
+   public void fill(long value) {
+     noNulls = true;
+     isRepeating = true;
+     vector[0] = value;
+   }
+ 
++  // Fill the column vector with nulls
++  public void fillWithNulls() {
++    noNulls = false;
++    isRepeating = true;
++    vector[0] = NULL_VALUE;
++    isNull[0] = true;
++  }
++
+   // Simplify vector by brute-force flattening noNulls and isRepeating
+   // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
+   // with many arguments.
+   public void flatten(boolean selectedInUse, int[] sel, int size) {
+     flattenPush();
+     if (isRepeating) {
+       isRepeating = false;
+       long repeatVal = vector[0];
+       if (selectedInUse) {
+         for (int j = 0; j < size; j++) {
+           int i = sel[j];
+           vector[i] = repeatVal;
+         }
+       } else {
+         Arrays.fill(vector, 0, size, repeatVal);
+       }
+       flattenRepeatingNulls(selectedInUse, sel, size);
+     }
+     flattenNoNulls(selectedInUse, sel, size);
+   }
+ 
+   @Override
+   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
+     vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum];
+   }
+ 
+   @Override
+   public void stringifyValue(StringBuilder buffer, int row) {
+     if (isRepeating) {
+       row = 0;
+     }
+     if (noNulls || !isNull[row]) {
+       buffer.append(vector[row]);
+     } else {
+       buffer.append("null");
+     }
+   }
+ }