You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2018/08/24 16:31:04 UTC

[2/3] hive git commit: HIVE-20367: Vectorization: Support streaming for PTF AVG, MAX, MIN, SUM (Matt McCline, reviewed by Teddy Choi)

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongAvg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongAvg.java
new file mode 100644
index 0000000..78d543a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongAvg.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates long avg() for a PTF group.
+ *
+ * Sum up non-null column values; group result is sum / non-null count.
+ */
+public class VectorPTFEvaluatorStreamingLongAvg extends VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected long sum;
+  private int nonNullGroupCount;
+  protected double avg;
+
+  public VectorPTFEvaluatorStreamingLongAvg(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Sum all non-null long column values for avg; maintain isGroupResultNull; after last row of
+    // last group batch compute the group avg when sum is non-null.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
+
+    DoubleColumnVector outputColVector = (DoubleColumnVector) batch.cols[outputColumnNum];
+    double[] outputVector = outputColVector.vector;
+
+    if (longColVector.isRepeating) {
+
+      if (longColVector.noNulls || !longColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        final double repeatedValue = longColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum += repeatedValue;
+          nonNullGroupCount++;
+
+          avg = sum / nonNullGroupCount;
+
+          // Output row i AVG.
+          outputVector[i] = avg;
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous AVG.
+          outputVector[0] = avg;
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (longColVector.noNulls) {
+      isNull = false;
+      long[] vector = longColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum += vector[i];
+        nonNullGroupCount++;
+
+        avg = sum / nonNullGroupCount;
+
+        // Output row i AVG.
+        outputVector[i] = avg;
+      }
+    } else {
+      boolean[] batchIsNull = longColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        outputColVector.isNull[i] = true;
+        outputColVector.noNulls = false;
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      long[] vector = longColVector.vector;
+
+      sum += vector[i];
+      nonNullGroupCount++;
+
+      avg = sum / nonNullGroupCount;
+
+      // Output row i AVG.
+      outputVector[i++] = avg;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum += vector[i];
+          nonNullGroupCount++;
+
+          avg = sum / nonNullGroupCount;
+
+          // Output row i AVG.
+          outputVector[i] = avg;
+        } else {
+
+          // Continue previous AVG.
+          outputVector[i] = avg;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum = 0;
+    nonNullGroupCount = 0;
+    avg = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMax.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMax.java
new file mode 100644
index 0000000..94d19b3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMax.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates long max() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingLongMax extends VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected long max;
+
+  public VectorPTFEvaluatorStreamingLongMax(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine maximum of all non-null long column values; maintain isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
+
+    LongColumnVector outputColVector = (LongColumnVector) batch.cols[outputColumnNum];
+    long[] outputVector = outputColVector.vector;
+
+    if (longColVector.isRepeating) {
+
+      if (longColVector.noNulls || !longColVector.isNull[0]) {
+
+        // We have a repeated value but we only need to evaluate once for MIN/MAX.
+        final long repeatedMax = longColVector.vector[0];
+
+        if (isNull) {
+          max = repeatedMax;
+          isNull = false;
+        } else if (repeatedMax > max) {
+          max = repeatedMax;
+        }
+        outputVector[0] = max;
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MAX.
+        outputVector[0] = max;
+      }
+      outputColVector.isRepeating = true;
+    } else if (longColVector.noNulls) {
+      long[] vector = longColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final long value = vector[i];
+        if (isNull) {
+          max = value;
+          isNull = false;
+        } else if (value > max) {
+          max = value;
+        }
+        outputVector[i] = max;
+      }
+    } else {
+      boolean[] batchIsNull = longColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MAX.
+          outputVector[i] = max;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      long[] vector = longColVector.vector;
+
+      final long firstValue = vector[i];
+      if (isNull) {
+        max = firstValue;
+        isNull = false;
+      } else if (firstValue > max) {
+        max = firstValue;
+      }
+
+      // Output row i max.
+      outputVector[i++] = max;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final long value = vector[i];
+          if (isNull) {
+            max = value;
+            isNull = false;
+          } else if (value > max) {
+            max = value;
+          }
+          outputVector[i] = max;
+        } else {
+
+          // Continue previous MAX.
+          outputVector[i] = max;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    max = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMin.java
new file mode 100644
index 0000000..2d7caf3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongMin.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates long min() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingLongMin extends VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected long min;
+
+  public VectorPTFEvaluatorStreamingLongMin(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Determine minimum of all non-null long column values; maintain isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
+
+    LongColumnVector outputColVector = (LongColumnVector) batch.cols[outputColumnNum];
+    long[] outputVector = outputColVector.vector;
+
+    if (longColVector.isRepeating) {
+
+      if (longColVector.noNulls || !longColVector.isNull[0]) {
+
+        // We have a repeated value but we only need to evaluate once for MIN/MAX.
+        final long repeatedMin = longColVector.vector[0];
+
+        if (isNull) {
+          min = repeatedMin;
+          isNull = false;
+        } else if (repeatedMin < min) {
+          min = repeatedMin;
+        }
+        outputVector[0] = min;
+      } else if (isNull) {
+        outputColVector.isNull[0] = true;
+        outputColVector.noNulls = false;
+      } else {
+
+        // Continue previous MIN.
+        outputVector[0] = min;
+      }
+      outputColVector.isRepeating = true;
+    } else if (longColVector.noNulls) {
+      long[] vector = longColVector.vector;
+      for (int i = 0; i < size; i++) {
+        final long value = vector[i];
+        if (isNull) {
+          min = value;
+          isNull = false;
+        } else if (value < min) {
+          min = value;
+        }
+        outputVector[i] = min;
+      }
+    } else {
+      boolean[] batchIsNull = longColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous MIN.
+          outputVector[i] = min;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      long[] vector = longColVector.vector;
+
+      final long firstValue = vector[i];
+      if (isNull) {
+        min = firstValue;
+        isNull = false;
+      } else if (firstValue < min) {
+        min = firstValue;
+      }
+
+      // Output row i min.
+      outputVector[i++] = min;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final long value = vector[i];
+          if (isNull) {
+            min = value;
+            isNull = false;
+          } else if (value < min) {
+            min = value;
+          }
+
+          // Output row i min.
+          outputVector[i] = min;
+        } else {
+
+          // Continue previous MIN.
+          outputVector[i] = min;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    min = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongSum.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongSum.java
new file mode 100644
index 0000000..76bca6b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorStreamingLongSum.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates long sum() for a PTF group.
+ */
+public class VectorPTFEvaluatorStreamingLongSum extends VectorPTFEvaluatorBase {
+
+  protected boolean isNull;
+  protected long sum;
+
+  public VectorPTFEvaluatorStreamingLongSum(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  @Override
+  public void evaluateGroupBatch(VectorizedRowBatch batch)
+      throws HiveException {
+
+    evaluateInputExpr(batch);
+
+    // Sum all non-null long column values; maintain isNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    LongColumnVector longColVector = ((LongColumnVector) batch.cols[inputColumnNum]);
+
+    LongColumnVector outputColVector = (LongColumnVector) batch.cols[outputColumnNum];
+    long[] outputVector = outputColVector.vector;
+
+    if (longColVector.isRepeating) {
+
+      if (longColVector.noNulls || !longColVector.isNull[0]) {
+
+        // We have a repeated value.
+        isNull = false;
+        final long repeatedValue = longColVector.vector[0];
+
+        for (int i = 0; i < size; i++) {
+          sum += repeatedValue;
+
+          // Output row i sum.
+          outputVector[i] = sum;
+        }
+      } else {
+        if (isNull) {
+          outputColVector.isNull[0] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[0] = sum;
+        }
+        outputColVector.isRepeating = true;
+      }
+    } else if (longColVector.noNulls) {
+      isNull = false;
+      long[] vector = longColVector.vector;
+      for (int i = 0; i < size; i++) {
+        sum += vector[i];
+
+        // Output row i sum.
+        outputVector[i] = sum;
+      }
+    } else {
+      boolean[] batchIsNull = longColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (isNull) {
+          outputColVector.isNull[i] = true;
+          outputColVector.noNulls = false;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[i] = sum;
+        }
+        if (++i >= size) {
+          return;
+        }
+      }
+
+      isNull = false;
+      long[] vector = longColVector.vector;
+
+      sum += vector[i];
+
+      // Output row i sum.
+      outputVector[i++] = sum;
+
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum += vector[i];
+
+          // Output row i sum.
+          outputVector[i] = sum;
+        } else {
+
+          // Continue previous SUM.
+          outputVector[i] = sum;
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isNull = true;
+    sum = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java
index ff89775..b0340d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java
@@ -162,7 +162,10 @@ public class VectorPTFGroupBatches {
 
     // Streaming evaluators fill in their results during the evaluate call.
     for (VectorPTFEvaluatorBase evaluator : evaluators) {
-      evaluator.evaluateGroupBatch(batch, isLastGroupBatch);
+      evaluator.evaluateGroupBatch(batch);
+      if (isLastGroupBatch) {
+        evaluator.doLastBatchWork();
+      }
     }
   }
 
@@ -170,7 +173,10 @@ public class VectorPTFGroupBatches {
       throws HiveException {
 
     for (VectorPTFEvaluatorBase evaluator : evaluators) {
-      evaluator.evaluateGroupBatch(batch, isLastGroupBatch);
+      evaluator.evaluateGroupBatch(batch);
+      if (isLastGroupBatch) {
+        evaluator.doLastBatchWork();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 9bb104d..1956125 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -228,6 +228,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.mapred.InputFormat;
@@ -2823,6 +2824,18 @@ public class Vectorizer implements PhysicalPlanResolver {
         setOperatorIssue(functionName + " only UNBOUNDED start frame is supported");
         return false;
       }
+      List<ExprNodeDesc> exprNodeDescList = evaluatorInputExprNodeDescLists[i];
+      final boolean isSingleParameter =
+          (exprNodeDescList != null &&
+          exprNodeDescList.size() == 1);
+      final ExprNodeDesc singleExprNodeDesc =
+          (isSingleParameter ? exprNodeDescList.get(0) : null);
+      final TypeInfo singleTypeInfo =
+          (isSingleParameter ? singleExprNodeDesc.getTypeInfo() : null);
+      final PrimitiveCategory singlePrimitiveCategory =
+          (singleTypeInfo instanceof PrimitiveTypeInfo ?
+              ((PrimitiveTypeInfo) singleTypeInfo).getPrimitiveCategory() : null);
+
       switch (windowFrameDef.getWindowType()) {
       case RANGE:
         if (!windowFrameDef.getEnd().isCurrentRow()) {
@@ -2831,15 +2844,25 @@ public class Vectorizer implements PhysicalPlanResolver {
         }
         break;
       case ROWS:
-        if (!windowFrameDef.isEndUnbounded()) {
-          setOperatorIssue(functionName + " UNBOUNDED end frame is not supported for ROWS window type");
-          return false;
+        {
+          boolean isRowEndCurrent =
+              (windowFrameDef.getEnd().isCurrentRow() &&
+              (supportedFunctionType == SupportedFunctionType.AVG ||
+               supportedFunctionType == SupportedFunctionType.MAX ||
+               supportedFunctionType == SupportedFunctionType.MIN ||
+               supportedFunctionType == SupportedFunctionType.SUM) &&
+              isSingleParameter &&
+              singlePrimitiveCategory != null);
+          if (!isRowEndCurrent && !windowFrameDef.isEndUnbounded()) {
+            setOperatorIssue(
+                functionName + " UNBOUNDED end frame is required for ROWS window type");
+            return false;
+          }
         }
         break;
       default:
         throw new RuntimeException("Unexpected window type " + windowFrameDef.getWindowType());
       }
-      List<ExprNodeDesc> exprNodeDescList = evaluatorInputExprNodeDescLists[i];
       if (exprNodeDescList != null && exprNodeDescList.size() > 1) {
         setOperatorIssue("More than 1 argument expression of aggregation function " + functionName);
         return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPTFDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPTFDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPTFDesc.java
index 830b8c8..53886fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPTFDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPTFDesc.java
@@ -50,6 +50,19 @@ import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorLongMin;
 import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorLongSum;
 import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorRank;
 import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorRowNumber;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDecimalAvg;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDecimalMax;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDecimalMin;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDecimalSum;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDoubleAvg;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDoubleMax;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDoubleMin;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingDoubleSum;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingLongAvg;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingLongMax;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingLongMin;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFEvaluatorStreamingLongSum;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType;
 import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
@@ -138,27 +151,46 @@ public class VectorPTFDesc extends AbstractVectorDesc  {
       WindowFrameDef windowFrameDef, Type columnVectorType, VectorExpression inputVectorExpression,
       int outputColumnNum) {
 
+    final boolean isRowEndCurrent =
+        (windowFrameDef.getWindowType() == WindowType.ROWS &&
+         windowFrameDef.getEnd().isCurrentRow());
+
     VectorPTFEvaluatorBase evaluator;
     switch (functionType) {
     case ROW_NUMBER:
-      evaluator = new VectorPTFEvaluatorRowNumber(windowFrameDef, inputVectorExpression, outputColumnNum);
+      evaluator =
+          new VectorPTFEvaluatorRowNumber(windowFrameDef, inputVectorExpression, outputColumnNum);
       break;
     case RANK:
-      evaluator = new VectorPTFEvaluatorRank(windowFrameDef, inputVectorExpression, outputColumnNum);
+      evaluator =
+          new VectorPTFEvaluatorRank(windowFrameDef, inputVectorExpression, outputColumnNum);
       break;
     case DENSE_RANK:
-      evaluator = new VectorPTFEvaluatorDenseRank(windowFrameDef, inputVectorExpression, outputColumnNum);
+      evaluator =
+          new VectorPTFEvaluatorDenseRank(windowFrameDef, inputVectorExpression, outputColumnNum);
       break;
     case MIN:
       switch (columnVectorType) {
       case LONG:
-        evaluator = new VectorPTFEvaluatorLongMin(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorLongMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingLongMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DOUBLE:
-        evaluator = new VectorPTFEvaluatorDoubleMin(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDoubleMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDoubleMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DECIMAL:
-        evaluator = new VectorPTFEvaluatorDecimalMin(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDecimalMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDecimalMin(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       default:
         throw new RuntimeException("Unexpected column vector type " + columnVectorType + " for " + functionType);
@@ -167,13 +199,25 @@ public class VectorPTFDesc extends AbstractVectorDesc  {
     case MAX:
       switch (columnVectorType) {
       case LONG:
-        evaluator = new VectorPTFEvaluatorLongMax(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorLongMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingLongMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DOUBLE:
-        evaluator = new VectorPTFEvaluatorDoubleMax(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDoubleMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDoubleMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DECIMAL:
-        evaluator = new VectorPTFEvaluatorDecimalMax(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDecimalMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDecimalMax(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       default:
         throw new RuntimeException("Unexpected column vector type " + columnVectorType + " for " + functionType);
@@ -182,13 +226,25 @@ public class VectorPTFDesc extends AbstractVectorDesc  {
     case SUM:
       switch (columnVectorType) {
       case LONG:
-        evaluator = new VectorPTFEvaluatorLongSum(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorLongSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingLongSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DOUBLE:
-        evaluator = new VectorPTFEvaluatorDoubleSum(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDoubleSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDoubleSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DECIMAL:
-        evaluator = new VectorPTFEvaluatorDecimalSum(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDecimalSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDecimalSum(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       default:
         throw new RuntimeException("Unexpected column vector type " + columnVectorType + " for " + functionType);
@@ -197,13 +253,25 @@ public class VectorPTFDesc extends AbstractVectorDesc  {
     case AVG:
       switch (columnVectorType) {
       case LONG:
-        evaluator = new VectorPTFEvaluatorLongAvg(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorLongAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingLongAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DOUBLE:
-        evaluator = new VectorPTFEvaluatorDoubleAvg(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDoubleAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDoubleAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       case DECIMAL:
-        evaluator = new VectorPTFEvaluatorDecimalAvg(windowFrameDef, inputVectorExpression, outputColumnNum);
+        evaluator = !isRowEndCurrent ?
+            new VectorPTFEvaluatorDecimalAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum) :
+            new VectorPTFEvaluatorStreamingDecimalAvg(
+                windowFrameDef, inputVectorExpression, outputColumnNum);
         break;
       default:
         throw new RuntimeException("Unexpected column vector type " + columnVectorType + " for " + functionType);

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/test/results/clientpositive/llap/ptf.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/ptf.q.out b/ql/src/test/results/clientpositive/llap/ptf.q.out
index 808d8c8..3fa2655 100644
--- a/ql/src/test/results/clientpositive/llap/ptf.q.out
+++ b/ql/src/test/results/clientpositive/llap/ptf.q.out
@@ -72,7 +72,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int), _col7 (type: double)
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
@@ -563,7 +563,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int), _col7 (type: double)
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
@@ -1616,7 +1616,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int), _col7 (type: double)
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
@@ -1792,7 +1792,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int), _col7 (type: double)
         Reducer 3 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
@@ -2029,7 +2029,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int), _col7 (type: double)
         Reducer 4 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
@@ -3812,7 +3812,7 @@ STAGE PLANS:
                     Statistics: Num rows: 26 Data size: 12766 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col5 (type: int)
         Reducer 5 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int)

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/test/results/clientpositive/llap/vector_ptf_part_simple.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_ptf_part_simple.q.out b/ql/src/test/results/clientpositive/llap/vector_ptf_part_simple.q.out
index e16f843..9f49f2e 100644
--- a/ql/src/test/results/clientpositive/llap/vector_ptf_part_simple.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_ptf_part_simple.q.out
@@ -659,7 +659,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is required for ROWS window type
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -1381,7 +1381,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is required for ROWS window type
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -2106,7 +2106,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: first_value UNBOUNDED end frame is required for ROWS window type
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -2781,16 +2781,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: a
+                reduceColumnSortOrder: +
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 3
+                    dataColumns: KEY.reducesinkkey0:string, VALUE._col0:string, VALUE._col1:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double, double, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string), VALUE._col1 (type: double)
                 outputColumnNames: _col0, _col1, _col2
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [0, 1, 2]
                 Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -2829,13 +2841,32 @@ STAGE PLANS:
                               name: avg
                               window function: GenericUDAFAverageEvaluatorDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleSum, VectorPTFEvaluatorStreamingDoubleMin, VectorPTFEvaluatorStreamingDoubleMax, VectorPTFEvaluatorStreamingDoubleAvg]
+                      functionInputExpressions: [col 2:double, col 2:double, col 2:double, col 2:double]
+                      functionNames: [sum, min, max, avg]
+                      keyInputColumns: [0]
+                      native: true
+                      nonKeyInputColumns: [1, 2]
+                      orderExpressions: [col 0:string]
+                      outputColumns: [3, 4, 5, 6, 0, 1, 2]
+                      outputTypes: [double, double, double, double, string, string, double]
+                      streamingColumns: [3, 4, 5, 6]
                   Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: string), _col2 (type: double), sum_window_0 (type: double), min_window_1 (type: double), max_window_2 (type: double), avg_window_3 (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 3, 4, 5, 6]
                     Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -3439,16 +3470,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 3
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col0:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double, double, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), VALUE._col0 (type: double)
                 outputColumnNames: _col0, _col1, _col2
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [0, 1, 2]
                 Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -3487,13 +3530,33 @@ STAGE PLANS:
                               name: avg
                               window function: GenericUDAFAverageEvaluatorDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleSum, VectorPTFEvaluatorStreamingDoubleMin, VectorPTFEvaluatorStreamingDoubleMax, VectorPTFEvaluatorStreamingDoubleAvg]
+                      functionInputExpressions: [col 2:double, col 2:double, col 2:double, col 2:double]
+                      functionNames: [sum, min, max, avg]
+                      keyInputColumns: [0, 1]
+                      native: true
+                      nonKeyInputColumns: [2]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [3, 4, 5, 6, 0, 1, 2]
+                      outputTypes: [double, double, double, double, string, string, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [3, 4, 5, 6]
                   Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: string), _col2 (type: double), sum_window_0 (type: double), min_window_1 (type: double), max_window_2 (type: double), avg_window_3 (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 3, 4, 5, 6]
                     Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -4100,16 +4163,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: [bigint, bigint]
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:int, KEY.reducesinkkey1:string, VALUE._col0:string, VALUE._col1:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double, double, double, double, bigint]
             Reduce Operator Tree:
               Select Operator
                 expressions: VALUE._col0 (type: string), KEY.reducesinkkey1 (type: string), VALUE._col1 (type: double)
                 outputColumnNames: _col0, _col1, _col2
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [2, 1, 3]
                 Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -4148,13 +4223,33 @@ STAGE PLANS:
                               name: avg
                               window function: GenericUDAFAverageEvaluatorDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleSum, VectorPTFEvaluatorStreamingDoubleMin, VectorPTFEvaluatorStreamingDoubleMax, VectorPTFEvaluatorStreamingDoubleAvg]
+                      functionInputExpressions: [col 3:double, col 3:double, col 3:double, col 3:double]
+                      functionNames: [sum, min, max, avg]
+                      keyInputColumns: [1]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [4, 5, 6, 7, 2, 1, 3]
+                      outputTypes: [double, double, double, double, string, string, double]
+                      partitionExpressions: [ConstantVectorExpression(val 0) -> 8:int]
+                      streamingColumns: [4, 5, 6, 7]
                   Statistics: Num rows: 40 Data size: 19816 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: string), _col2 (type: double), sum_window_0 (type: double), min_window_1 (type: double), max_window_2 (type: double), avg_window_3 (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [2, 1, 3, 4, 5, 6, 7]
                     Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 40 Data size: 10344 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/test/results/clientpositive/llap/vector_windowing.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_windowing.q.out b/ql/src/test/results/clientpositive/llap/vector_windowing.q.out
index 6a132b8..cf6af00 100644
--- a/ql/src/test/results/clientpositive/llap/vector_windowing.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_windowing.q.out
@@ -68,16 +68,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col3:int, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [bigint, bigint, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
                 outputColumnNames: _col1, _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1, 0, 2, 3]
                 Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -112,13 +124,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorRank, VectorPTFEvaluatorDenseRank, VectorPTFEvaluatorStreamingDoubleSum]
+                      functionInputExpressions: [col 1:string, col 1:string, col 3:double]
+                      functionNames: [rank, dense_rank, sum]
+                      keyInputColumns: [1, 0]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [4, 5, 6, 1, 0, 2, 3]
+                      outputTypes: [int, int, double, string, string, int, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [4, 5, 6]
                   Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), _col1 (type: string), _col5 (type: int), rank_window_0 (type: int), dense_rank_window_1 (type: int), round(sum_window_2, 2) (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 4, 5, 7]
+                        selectExpressions: RoundWithNumDigitsDoubleToDouble(col 6, decimalPlaces 2) -> 7:double
                     Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -938,7 +971,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: lag not in supported functions [avg, count, dense_rank, first_value, last_value, max, min, rank, row_number, sum]
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -1139,7 +1172,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: lag not in supported functions [avg, count, dense_rank, first_value, last_value, max, min, rank, row_number, sum]
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -1443,7 +1476,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: PTF operator: lag not in supported functions [avg, count, dense_rank, first_value, last_value, max, min, rank, row_number, sum]
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -1786,16 +1819,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col3:int, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [bigint, bigint, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
                 outputColumnNames: _col1, _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1, 0, 2, 3]
                 Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -1830,13 +1875,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorRank, VectorPTFEvaluatorDenseRank, VectorPTFEvaluatorStreamingDoubleSum]
+                      functionInputExpressions: [col 1:string, col 1:string, col 3:double]
+                      functionNames: [rank, dense_rank, sum]
+                      keyInputColumns: [1, 0]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [4, 5, 6, 1, 0, 2, 3]
+                      outputTypes: [int, int, double, string, string, int, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [4, 5, 6]
                   Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), _col1 (type: string), _col5 (type: int), rank_window_0 (type: int), dense_rank_window_1 (type: int), round(sum_window_2, 2) (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 4, 5, 7]
+                        selectExpressions: RoundWithNumDigitsDoubleToDouble(col 6, decimalPlaces 2) -> 7:double
                     Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -1962,16 +2028,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col3:int, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [bigint, bigint, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
                 outputColumnNames: _col1, _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1, 0, 2, 3]
                 Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -2006,13 +2084,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorRank, VectorPTFEvaluatorDenseRank, VectorPTFEvaluatorStreamingDoubleSum]
+                      functionInputExpressions: [col 1:string, col 1:string, col 3:double]
+                      functionNames: [rank, dense_rank, sum]
+                      keyInputColumns: [1, 0]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [4, 5, 6, 1, 0, 2, 3]
+                      outputTypes: [int, int, double, string, string, int, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [4, 5, 6]
                   Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), _col1 (type: string), _col5 (type: int), rank_window_0 (type: int), dense_rank_window_1 (type: int), round(sum_window_2, 2) (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 4, 5, 7]
+                        selectExpressions: RoundWithNumDigitsDoubleToDouble(col 6, decimalPlaces 2) -> 7:double
                     Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 6214 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -5422,7 +5521,7 @@ STAGE PLANS:
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
+                notVectorizedReason: Aggregation Function expression for GROUPBY operator: UDF compute_stats not supported
                 vectorized: false
             Reduce Operator Tree:
               Select Operator
@@ -8349,16 +8448,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: true
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col3:int, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
                 outputColumnNames: _col1, _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1, 0, 2, 3]
                 Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -8385,15 +8496,38 @@ STAGE PLANS:
                               name: min
                               window function: GenericUDAFMinEvaluator
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleSum, VectorPTFEvaluatorStreamingDoubleMin]
+                      functionInputExpressions: [col 3:double, col 3:double]
+                      functionNames: [sum, min]
+                      keyInputColumns: [1, 0]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 0:string, col 1:string]
+                      outputColumns: [4, 5, 1, 0, 2, 3]
+                      outputTypes: [double, double, string, string, int, double]
+                      streamingColumns: [4, 5]
                   Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: sum_window_0 (type: double), min_window_1 (type: double), _col1 (type: string), _col2 (type: string), _col5 (type: int), _col7 (type: double)
                     outputColumnNames: sum_window_0, min_window_1, _col1, _col2, _col5, _col7
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [4, 5, 1, 0, 2, 3]
                     Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                     Reduce Output Operator
                       key expressions: _col2 (type: string), _col1 (type: string)
                       sort order: ++
                       Map-reduce partition columns: _col2 (type: string), _col1 (type: string)
+                      Reduce Sink Vectorization:
+                          className: VectorReduceSinkObjectHashOperator
+                          keyColumnNums: [0, 1]
+                          native: true
+                          nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+                          partitionColumnNums: [0, 1]
+                          valueColumnNums: [4, 5, 2, 3]
                       Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: sum_window_0 (type: double), min_window_1 (type: double), _col5 (type: int), _col7 (type: double)
         Reducer 3 
@@ -8777,16 +8911,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 4
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col3:int, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: string), KEY.reducesinkkey0 (type: string), VALUE._col3 (type: int), VALUE._col5 (type: double)
                 outputColumnNames: _col1, _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1, 0, 2, 3]
                 Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -8807,13 +8953,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleSum]
+                      functionInputExpressions: [col 3:double]
+                      functionNames: [sum]
+                      keyInputColumns: [1, 0]
+                      native: true
+                      nonKeyInputColumns: [2, 3]
+                      orderExpressions: [col 1:string]
+                      outputColumns: [4, 1, 0, 2, 3]
+                      outputTypes: [double, string, string, int, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [4]
                   Statistics: Num rows: 26 Data size: 12974 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), _col1 (type: string), _col5 (type: int), round(sum_window_0, 2) (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 5]
+                        selectExpressions: RoundWithNumDigitsDoubleToDouble(col 4, decimalPlaces 2) -> 5:double
                     Statistics: Num rows: 26 Data size: 6006 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 6006 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -9663,16 +9830,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: [string, string]
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 2
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:int
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [bigint, string, string]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey1 (type: int)
                 outputColumnNames: _col5
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [1]
                 Statistics: Num rows: 5 Data size: 1360 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -9693,13 +9872,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumLong
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingLongSum]
+                      functionInputExpressions: [col 1:int]
+                      functionNames: [sum]
+                      keyInputColumns: [1]
+                      native: true
+                      nonKeyInputColumns: []
+                      orderExpressions: [col 1:int]
+                      outputColumns: [2, 1]
+                      outputTypes: [bigint, int]
+                      partitionExpressions: [ConstantVectorExpression(val Manufacturer#6) -> 3:string]
+                      streamingColumns: [2]
                   Statistics: Num rows: 5 Data size: 1360 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: 'Manufacturer#6' (type: string), sum_window_0 (type: bigint)
                     outputColumnNames: _col0, _col1
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [4, 2]
+                        selectExpressions: ConstantVectorExpression(val Manufacturer#6) -> 4:string
                     Statistics: Num rows: 5 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 5 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat

http://git-wip-us.apache.org/repos/asf/hive/blob/cc38bcc5/ql/src/test/results/clientpositive/llap/vector_windowing_expressions.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_windowing_expressions.q.out b/ql/src/test/results/clientpositive/llap/vector_windowing_expressions.q.out
index 2bb7730..5ea866b 100644
--- a/ql/src/test/results/clientpositive/llap/vector_windowing_expressions.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_windowing_expressions.q.out
@@ -307,16 +307,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: sum UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 3
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:double, VALUE._col4:int
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [bigint, double, double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey0 (type: string), VALUE._col4 (type: int), KEY.reducesinkkey1 (type: double)
                 outputColumnNames: _col2, _col5, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [0, 2, 1]
                 Statistics: Num rows: 26 Data size: 9828 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -344,13 +356,34 @@ STAGE PLANS:
                               name: sum
                               window function: GenericUDAFSumDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorRank, VectorPTFEvaluatorStreamingDoubleSum]
+                      functionInputExpressions: [col 1:double, col 1:double]
+                      functionNames: [rank, sum]
+                      keyInputColumns: [0, 1]
+                      native: true
+                      nonKeyInputColumns: [2]
+                      orderExpressions: [col 1:double]
+                      outputColumns: [3, 4, 0, 2, 1]
+                      outputTypes: [int, double, string, int, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [3, 4]
                   Statistics: Num rows: 26 Data size: 9828 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), _col7 (type: double), _col5 (type: int), rank_window_0 (type: int), sum_window_1 (type: double), (sum_window_1 - 5.0D) (type: double)
                     outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 1, 2, 3, 4, 5]
+                        selectExpressions: DoubleColSubtractDoubleScalar(col 4:double, val 5.0) -> 5:double
                     Statistics: Num rows: 26 Data size: 3380 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 3380 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -1538,16 +1571,28 @@ STAGE PLANS:
                     partitionColumnCount: 0
                     scratchColumnTypeNames: []
         Reducer 2 
-            Execution mode: llap
+            Execution mode: vectorized, llap
             Reduce Vectorization:
                 enabled: true
                 enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true
-                notVectorizedReason: PTF operator: avg UNBOUNDED end frame is not supported for ROWS window type
-                vectorized: false
+                reduceColumnNullOrder: aa
+                reduceColumnSortOrder: ++
+                allNative: false
+                usesVectorUDFAdaptor: false
+                vectorized: true
+                rowBatchContext:
+                    dataColumnCount: 3
+                    dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:string, VALUE._col5:double
+                    partitionColumnCount: 0
+                    scratchColumnTypeNames: [double]
             Reduce Operator Tree:
               Select Operator
                 expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), VALUE._col5 (type: double)
                 outputColumnNames: _col2, _col4, _col7
+                Select Vectorization:
+                    className: VectorSelectOperator
+                    native: true
+                    projectedOutputColumnNums: [0, 1, 2]
                 Statistics: Num rows: 26 Data size: 12428 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
@@ -1568,13 +1613,33 @@ STAGE PLANS:
                               name: avg
                               window function: GenericUDAFAverageEvaluatorDouble
                               window frame: ROWS PRECEDING(MAX)~CURRENT
+                  PTF Vectorization:
+                      className: VectorPTFOperator
+                      evaluatorClasses: [VectorPTFEvaluatorStreamingDoubleAvg]
+                      functionInputExpressions: [col 2:double]
+                      functionNames: [avg]
+                      keyInputColumns: [0, 1]
+                      native: true
+                      nonKeyInputColumns: [2]
+                      orderExpressions: [col 1:string, col 0:string]
+                      outputColumns: [3, 0, 1, 2]
+                      outputTypes: [double, string, string, double]
+                      partitionExpressions: [col 0:string]
+                      streamingColumns: [3]
                   Statistics: Num rows: 26 Data size: 12428 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col2 (type: string), avg_window_0 (type: double)
                     outputColumnNames: _col0, _col1
+                    Select Vectorization:
+                        className: VectorSelectOperator
+                        native: true
+                        projectedOutputColumnNums: [0, 3]
                     Statistics: Num rows: 26 Data size: 2756 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
+                      File Sink Vectorization:
+                          className: VectorFileSinkOperator
+                          native: false
                       Statistics: Num rows: 26 Data size: 2756 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat