You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/06/11 20:30:52 UTC

[2/2] drill git commit: DRILL-3200: Add Window functions: ROW_NUMBER, RANK, PERCENT_RANK, DENSE_RANK and CUME_DIST

DRILL-3200: Add Window functions: ROW_NUMBER, RANK, PERCENT_RANK, DENSE_RANK and CUME_DIST

- enum WindowFrameRecordBatch.WindowFunction to handle supported window function and their corresponding output MajorType
- renamed WindowFrameTemplate -> DefaultFrameTemplate, cleaned the template to handle the default frame efficiently:
 . a batch can be processed as soon as we find the last peer row of it's last row
 . once a batch is processed it can be safely released => we can transfer it's value vectors to the container instead of copying them
- DefaultFrameTemplate.Partition tracks the current window frame and computes the following window functions automatically: row_number, rank, dense_rank, percent_rank, cume_dist. It doesn't need to aggregate the value vectors to compute these window functions
- updated TestWindowFrame to check the results of row_number, rank, dense_rank, percent_rank and cume_dist in various cases
 . added a debug config option to MSorter to control the size of batches. This is needed by TestWindowFrame so it can use small test data files (20 rows per batch)
 . removed contrib/data/window-test-data
- WindowFrameRecordBatch properly releases saved batches if the query stops prematurely
- GenerateTestData can be used to generate test data for the window function unit tests [it's a work in progress and can be either improved to make it developer friendly or removed from the final patch]
- using newly created WindowDataBatch in place of RecordDataBatch, to expose FragmentContext and VectorAccessible (fixes DRILL-3218)
- window.enable is true by default


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3bccec91
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3bccec91
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3bccec91

Branch: refs/heads/master
Commit: 3bccec9110c7ff86fa3cf04baa81a1747e1f5b9e
Parents: 453f6f7
Author: adeneche <ad...@gmail.com>
Authored: Fri Mar 13 10:06:32 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Thu Jun 11 10:59:35 2015 -0700

----------------------------------------------------------------------
 contrib/data/pom.xml                            |    1 -
 contrib/data/window-test-data/pom.xml           |   64 --
 exec/java-exec/pom.xml                          |    6 -
 .../org/apache/drill/exec/ExecConstants.java    |    3 +-
 .../impl/window/DefaultFrameTemplate.java       |  320 ++++++
 .../exec/physical/impl/window/Partition.java    |   82 ++
 .../physical/impl/window/WindowDataBatch.java   |   93 ++
 .../impl/window/WindowFrameRecordBatch.java     |  216 ++--
 .../impl/window/WindowFrameTemplate.java        |  379 -------
 .../exec/physical/impl/window/WindowFramer.java |   21 +-
 .../exec/physical/impl/xsort/MSortTemplate.java |   15 +-
 .../drill/exec/record/AbstractRecordBatch.java  |    6 +-
 .../apache/drill/TestDisabledFunctionality.java |   13 -
 .../physical/impl/window/GenerateTestData.java  |  286 +++++
 .../physical/impl/window/TestWindowFrame.java   |   91 +-
 .../src/test/resources/window/allData.csv       |    5 +
 .../src/test/resources/window/b1.p1.subs.tsv    |   20 +
 .../src/test/resources/window/b1.p1.tsv         |   20 +
 .../src/test/resources/window/b1.p1/0.data.json |   21 +
 .../src/test/resources/window/b1.p2.subs.tsv    |   20 +
 .../src/test/resources/window/b1.p2.tsv         |   20 +
 .../src/test/resources/window/b1.p2/0.data.json |   22 +
 .../src/test/resources/window/b2.p2.subs.tsv    |   40 +
 .../src/test/resources/window/b2.p2.tsv         |   40 +
 .../src/test/resources/window/b2.p2/0.data.json |   22 +
 .../src/test/resources/window/b2.p2/1.data.json |   20 +
 .../src/test/resources/window/b2.p4.subs.tsv    |   40 +
 .../src/test/resources/window/b2.p4.tsv         |   40 +
 .../src/test/resources/window/b2.p4/0.data.json |   24 +
 .../src/test/resources/window/b2.p4/1.data.json |   20 +
 .../src/test/resources/window/b3.p2.subs.tsv    |   60 ++
 .../src/test/resources/window/b3.p2.tsv         |   60 ++
 .../src/test/resources/window/b3.p2/0.data.json |   22 +
 .../src/test/resources/window/b3.p2/1.data.json |   20 +
 .../src/test/resources/window/b3.p2/2.data.json |   20 +
 .../src/test/resources/window/b4.p4.subs.tsv    |   80 ++
 .../src/test/resources/window/b4.p4.tsv         |   80 ++
 .../src/test/resources/window/b4.p4/0.data.json |   24 +
 .../src/test/resources/window/b4.p4/1.data.json |   20 +
 .../src/test/resources/window/b4.p4/2.data.json |   20 +
 .../src/test/resources/window/b4.p4/3.data.json |   20 +
 .../src/test/resources/window/mediumData.json   | 1000 ------------------
 .../src/test/resources/window/oneKeyCount.json  |   43 -
 .../test/resources/window/oneKeyCountData.json  |    4 -
 .../resources/window/oneKeyCountMultiBatch.json |   72 --
 .../src/test/resources/window/twoKeys.json      |   44 -
 .../src/test/resources/window/twoKeysData.json  |    8 -
 47 files changed, 1812 insertions(+), 1755 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/data/pom.xml b/contrib/data/pom.xml
index d1def76..450bc0d 100644
--- a/contrib/data/pom.xml
+++ b/contrib/data/pom.xml
@@ -33,6 +33,5 @@
 
   <modules>
     <module>tpch-sample-data</module>
-    <module>window-test-data</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/window-test-data/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/data/window-test-data/pom.xml b/contrib/data/window-test-data/pom.xml
deleted file mode 100644
index 6d195da..0000000
--- a/contrib/data/window-test-data/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-  license agreements. See the NOTICE file distributed with this work for additional 
-  information regarding copyright ownership. The ASF licenses this file to 
-  You under the Apache License, Version 2.0 (the "License"); you may not use 
-  this file except in compliance with the License. You may obtain a copy of 
-  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-  by applicable law or agreed to in writing, software distributed under the 
-  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-  OF ANY KIND, either express or implied. See the License for the specific 
-  language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>drill-contrib-data-parent</artifactId>
-    <groupId>org.apache.drill.contrib.data</groupId>
-    <version>1.1.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>window-test-data</artifactId>
-  <name>contrib/data/window-test-data</name>
-  <packaging>jar</packaging>
-
-  <dependencies>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>com.googlecode.maven-download-plugin</groupId>
-        <artifactId>download-maven-plugin</artifactId>
-        <version>1.2.0</version>
-        <executions>
-          <execution>
-            <id>install-tgz</id>
-            <phase>prepare-package</phase>
-            <goals>
-              <goal>wget</goal>
-            </goals>
-            <configuration>
-              <url>https://s3-us-west-2.amazonaws.com/denbucket/window_test_data_0.1.tgz</url>
-              <outputFileName>window.tgz</outputFileName>
-              <unpack>true</unpack>
-              <outputDirectory>${project.build.directory}/classes/window</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <pluginRepositories>
-    <pluginRepository>
-      <id>sonatype-public-repository</id>
-      <url>https://oss.sonatype.org/content/groups/public</url>
-      <snapshots>
-        <enabled>true</enabled>
-      </snapshots>
-      <releases>
-        <enabled>true</enabled>
-      </releases>
-    </pluginRepository>
-  </pluginRepositories>
-
-</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index b5cd52b..5cc209d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -38,12 +38,6 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.drill.contrib.data</groupId>
-      <artifactId>window-test-data</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
     <!-- <dependency> -->
     <!-- <groupId>org.ow2.asm</groupId> -->
     <!-- <artifactId>asm-commons</artifactId> -->

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 91793f5..8ea90e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -71,6 +71,7 @@ public interface ExecConstants {
   public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
   public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
   public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
+  public static final String EXTERNAL_SORT_MSORT_MAX_BATCHSIZE = "drill.exec.sort.external.msort.batch.maxsize";
   public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
   public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
   public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
@@ -238,7 +239,7 @@ public interface ExecConstants {
   public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
 
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
-  public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
+  public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, true);
 
   public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
   public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
new file mode 100644
index 0000000..ada068b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import javax.inject.Named;
+import java.util.Iterator;
+import java.util.List;
+
+
+public abstract class DefaultFrameTemplate implements WindowFramer {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
+
+  private VectorContainer container;
+  private List<WindowDataBatch> batches;
+  private int outputCount; // number of rows in currently/last processed batch
+
+  /**
+   * current partition being processed.</p>
+   * Can span over multiple batches, so we may need to keep it between calls to doWork()
+   */
+  private Partition partition;
+
+  @Override
+  public void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException {
+    this.container = container;
+    this.batches = batches;
+
+    outputCount = 0;
+    partition = null;
+  }
+
+  private void allocateOutgoing() {
+    for (VectorWrapper<?> w : container) {
+      w.getValueVector().allocateNew();
+    }
+  }
+
+  /**
+   * processes all rows of current batch:
+   * <ul>
+   *   <li>compute aggregations</li>
+   *   <li>compute window functions</li>
+   *   <li>transfer remaining vectors from current batch to container</li>
+   * </ul>
+   */
+  @Override
+  public void doWork() throws DrillException {
+    int currentRow = 0;
+
+    logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows",
+      batches.size(), batches.get(0).getRecordCount());
+
+    allocateOutgoing();
+
+    final WindowDataBatch current = batches.get(0);
+
+    // we need to store the record count explicitly, because we release current batch at the end of this call
+    outputCount = current.getRecordCount();
+
+    while (currentRow < outputCount) {
+      if (partition != null) {
+        assert currentRow == 0 : "pending windows are only expected at the start of the batch";
+
+        // we have a pending window we need to handle from a previous call to doWork()
+        logger.trace("we have a pending partition {}", partition);
+      } else {
+        final int length = computePartitionSize(currentRow);
+        partition = new Partition(length);
+        setupWrite(current, container);
+      }
+
+      currentRow = processPartition(currentRow);
+      if (partition.isDone()) {
+        partition = null;
+        resetValues();
+      }
+    }
+
+    // transfer "non aggregated" vectors
+    for (VectorWrapper<?> vw : current) {
+      ValueVector v = container.addOrGet(vw.getField());
+      TransferPair tp = vw.getValueVector().makeTransferPair(v);
+      tp.transfer();
+    }
+
+    for (VectorWrapper<?> v : container) {
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+
+    // because we are using the default frame, and we keep the aggregated value until we start a new frame
+    // we can safely free the current batch
+    batches.remove(0).clear();
+
+    logger.trace("WindowFramer.doWork() END");
+  }
+
+  /**
+   * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
+   * @param currentRow first unprocessed row
+   * @return index of next unprocessed row
+   * @throws DrillException if it can't write into the container
+   */
+  private int processPartition(final int currentRow) throws DrillException {
+    logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
+
+    int row = currentRow;
+    while (row < outputCount && !partition.isDone()) {
+      if (partition.isFrameDone()) {
+        // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+        partition.newFrame(countPeers(row));
+        aggregatePeers(row);
+      }
+
+      outputAggregatedValues(row, partition);
+
+      partition.rowAggregated();
+      row++;
+    }
+
+    return row;
+  }
+
+  /**
+   * @return number of rows that are part of the partition starting at row start of first batch
+   */
+  private int computePartitionSize(final int start) {
+    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
+
+    // current partition always starts from first batch
+    final VectorAccessible first = getCurrent();
+
+    int length = 0;
+
+    // count all rows that are in the same partition of start
+    // keep increasing length until we find first row of next partition or we reach the very
+    // last batch
+    for (WindowDataBatch batch : batches) {
+      final int recordCount = batch.getRecordCount();
+
+      // check first container from start row, and subsequent containers from first row
+      for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++) {
+        if (!isSamePartition(start, first, row, batch)) {
+          return length;
+        }
+      }
+    }
+
+    return length;
+  }
+
+  /**
+   * Counts how many rows are peer with the first row of the current frame
+   * @param start first row of current frame
+   * @return number of peer rows
+   */
+  private int countPeers(final int start) {
+    // current frame always starts from first batch
+    final VectorAccessible first = getCurrent();
+
+    int length = 0;
+
+    // count all rows that are in the same frame of starting row
+    // keep increasing length until we find first non peer row we reach the very
+    // last batch
+    for (WindowDataBatch batch : batches) {
+      final int recordCount = batch.getRecordCount();
+
+      // for every remaining row in the partition, count it if it's a peer row
+      final int remaining = partition.getRemaining();
+      for (int row = (batch == first) ? start : 0; row < recordCount && length < remaining; row++, length++) {
+        if (!isPeer(start, first, row, batch)) {
+          return length;
+        }
+      }
+    }
+
+    return length;
+  }
+
+  /**
+   * aggregates all peer rows of current row
+   * @param currentRow starting row of the current frame
+   * @throws SchemaChangeException
+   */
+  private void aggregatePeers(final int currentRow) throws SchemaChangeException {
+    logger.trace("aggregating {} rows starting from {}", partition.getPeers(), currentRow);
+    assert !partition.isFrameDone() : "frame is empty!";
+
+    // a single frame can include rows from multiple batches
+    // start processing first batch and, if necessary, move to next batches
+    Iterator<WindowDataBatch> iterator = batches.iterator();
+    WindowDataBatch current = iterator.next();
+    setupRead(current, container);
+
+    final int peers = partition.getPeers();
+    for (int i = 0, row = currentRow; i < peers; i++, row++) {
+      if (row >= current.getRecordCount()) {
+        // we reached the end of the current batch, move to the next one
+        current = iterator.next();
+        setupRead(current, container);
+        row = 0;
+      }
+
+      aggregateRecord(row);
+    }
+  }
+
+  @Override
+  public boolean canDoWork() {
+    // check if we can process a saved batch
+    if (batches.size() < 2) {
+      logger.trace("we don't have enough batches to proceed, fetch next batch");
+      return false;
+    }
+
+    final VectorAccessible current = getCurrent();
+    final int currentSize = current.getRecordCount();
+    final VectorAccessible last = batches.get(batches.size() - 1);
+    final int lastSize = last.getRecordCount();
+
+    if (!isSamePartition(currentSize - 1, current, lastSize - 1, last)
+        || !isPeer(currentSize - 1, current, lastSize - 1, last)) {
+      logger.trace("frame changed, we are ready to process first saved batch");
+      return true;
+    } else {
+      logger.trace("frame didn't change, fetch next batch");
+      return false;
+    }
+  }
+
+  /**
+   * @return saved batch that will be processed in doWork()
+   */
+  private VectorAccessible getCurrent() {
+    return batches.get(0);
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputCount;
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  /**
+   * setup incoming container for aggregateRecord()
+   */
+  public abstract void setupRead(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  /**
+   * setup outgoing container for outputAggregatedValues. This will also reset the aggregations in most cases.
+   */
+  public abstract void setupWrite(@Named("incoming") WindowDataBatch incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+  /**
+   * aggregates a row from the incoming container
+   * @param index of row to aggregate
+   */
+  public abstract void aggregateRecord(@Named("index") int index);
+
+  /**
+   * writes aggregated values to row of outgoing container
+   * @param outIndex index of row
+   */
+  public abstract void outputAggregatedValues(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+
+  /**
+   * reset all window functions
+   */
+  public abstract boolean resetValues();
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the partition by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+
+  /**
+   * compares two rows from different batches (can be the same), if they have the same value for the order by
+   * expression
+   * @param b1Index index of first row
+   * @param b1 batch for first row
+   * @param b2Index index of second row
+   * @param b2 batch for second row
+   * @return true if the rows are in the same partition
+   */
+  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
new file mode 100644
index 0000000..8d6728e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+/**
+ * Used internally to keep track of partitions and frames
+ */
+public class Partition {
+  private final int length; // size of this partition
+  private int remaining;
+  private int peers;
+
+  // we keep these attributes public because the generated code needs to access them
+  public int row_number;
+  public int rank;
+  public int dense_rank;
+  public double percent_rank;
+  public double cume_dist;
+
+  /**
+   * @return number of rows not yet aggregated in this partition
+   */
+  public int getRemaining() {
+    return remaining;
+  }
+
+  /**
+   * @return peer rows not yet aggregated in current frame
+   */
+  public int getPeers() {
+    return peers;
+  }
+
+  public Partition(int length) {
+    this.length = length;
+    remaining = length;
+    row_number = 1;
+  }
+
+  public void rowAggregated() {
+    remaining--;
+    peers--;
+
+    row_number++;
+  }
+
+  public void newFrame(int peers) {
+    this.peers = peers;
+    rank = row_number; // rank = row number of 1st peer
+    dense_rank++;
+    percent_rank = length > 1 ? (double) (rank - 1) / (length - 1) : 0;
+    cume_dist = (double)(rank + peers - 1) / length;
+  }
+
+  public boolean isDone() {
+    return remaining == 0;
+  }
+
+  public boolean isFrameDone() {
+    return peers == 0;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("{length: %d, remaining partition: %d, remaining peers: %d}", length, remaining, peers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
new file mode 100644
index 0000000..5045cb3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class WindowDataBatch implements VectorAccessible {
+
+  private final FragmentContext context;
+  private final VectorContainer container;
+  private final int recordCount;
+
+  public WindowDataBatch(final VectorAccessible batch, final FragmentContext context) {
+    this.context = context;
+    recordCount = batch.getRecordCount();
+
+    List<ValueVector> vectors = Lists.newArrayList();
+
+    for (VectorWrapper<?> v : batch) {
+      if (v.isHyper()) {
+        throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+      }
+      TransferPair tp = v.getValueVector().getTransferPair();
+      tp.transfer();
+      vectors.add(tp.getTo());
+    }
+
+    container = new VectorContainer();
+    container.addCollection(vectors);
+    container.setRecordCount(recordCount);
+    container.buildSchema(batch.getSchema().getSelectionVectorMode());
+  }
+
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+    return container.getValueAccessorById(clazz, fieldIds);
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  public void clear() {
+    container.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 428632f..da189eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -19,13 +19,21 @@ package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -33,13 +41,11 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -61,12 +67,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class);
 
   private final RecordBatch incoming;
-  private List<RecordBatchData> batches;
+  private List<WindowDataBatch> batches;
   private WindowFramer framer;
 
   private boolean noMoreBatches;
   private BatchSchema schema;
 
+  /**
+   * Describes supported window functions and if they output FLOAT8 or BIGINT
+   */
+  private enum WindowFunction {
+    ROW_NUMBER(false),
+    RANK(false),
+    DENSE_RANK(false),
+    PERCENT_RANK(true),
+    CUME_DIST(true);
+
+    private final boolean useDouble;
+
+    WindowFunction(boolean useDouble) {
+      this.useDouble = useDouble;
+    }
+
+    public TypeProtos.MajorType getMajorType() {
+      return useDouble ? Types.required(TypeProtos.MinorType.FLOAT8) : Types.required(TypeProtos.MinorType.BIGINT);
+    }
+
+    /**
+     * Extract the WindowFunction corresponding to the logical expression
+     * @param expr logical expression
+     * @return WindowFunction or null if the logical expression is not a window function
+     */
+    public static WindowFunction fromExpression(final LogicalExpression expr) {
+      if (!(expr instanceof FunctionCall)) {
+        return null;
+      }
+
+      final String name = ((FunctionCall) expr).getName();
+      try {
+        return WindowFunction.valueOf(name.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        return null; // not a window function
+      }
+    }
+  }
+
   public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
@@ -90,7 +135,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
    *
    * <p><pre>
    * when innerNext() is called:
-   *   call next(incoming), we receive and save b0 in a list of RecordDataBatch
+   *   call next(incoming), we receive and save b0 in a list of WindowDataBatch
    *     we can't process b0 yet because we don't know if p1 has more rows upstream
    *   call next(incoming), we receive and save b1
    *     we can't process b0 yet for the same reason previously stated
@@ -107,7 +152,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
    * when innerNext() is called:
    *  we return NONE
    * </pre></p>
-   *
+   * The previous scenario applies when we don't have an ORDER BY clause, otherwise a batch can be processed
+   * as soon as we reach the final peer row of the batch's last row (we find the end of the last frame of the batch).
+   * </p>
+   * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of
+   * a partition. We can safely free a batch as soon as it has been processed.
    */
   @Override
   public IterOutcome innerNext() {
@@ -130,24 +179,21 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
         case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
+          cleanup();
           return upstream;
         case OK_NEW_SCHEMA:
-          // when a partition of rows exceeds the current processed batch, it will be kept as "pending" and processed
-          // when innerNext() is called again. If the schema changes, the framer is "rebuilt" and the pending information
-          // will be lost which may lead to incorrect results.
-
-          // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+          // We don't support schema changes
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
-              throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+              throw new UnsupportedOperationException("OVER clause doesn't currently support changing schemas.");
             }
             this.schema = incoming.getSchema();
           }
         case OK:
-          batches.add(new RecordBatchData(incoming));
+          batches.add(new WindowDataBatch(incoming, context));
           break;
         default:
-          throw new UnsupportedOperationException();
+          throw new UnsupportedOperationException("Unsupported upstrean state " + upstream);
       }
     }
 
@@ -157,15 +203,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
       return IterOutcome.NONE;
     }
 
-    // process a saved batch
+    // process first saved batch, then release it
     try {
       framer.doWork();
     } catch (DrillException e) {
       context.fail(e);
-      if (framer != null) {
-        framer.cleanup();
-        framer = null;
-      }
+      cleanup();
       return IterOutcome.STOP;
     }
 
@@ -201,52 +244,44 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   }
 
   private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
-    logger.trace("creating framer");
+    assert framer == null : "createFramer should only be called once";
 
-    container.clear();
+    logger.trace("creating framer");
 
-    if (framer != null) {
-      framer.cleanup();
-      framer = null;
-    }
+    final List<LogicalExpression> aggExprs = Lists.newArrayList();
+    final Map<WindowFunction, TypedFieldId> winExprs = Maps.newHashMap();
+    final List<LogicalExpression> keyExprs = Lists.newArrayList();
+    final List<LogicalExpression> orderExprs = Lists.newArrayList();
+    final ErrorCollector collector = new ErrorCollectorImpl();
 
-    ErrorCollector collector = new ErrorCollectorImpl();
+    container.clear();
 
-    // setup code generation to copy all incoming vectors to the container
-    // we can't just transfer them because after we pass the container downstream, some values will be needed when
-    // processing the next batches
-    int j = 0;
-    LogicalExpression[] windowExprs = new LogicalExpression[batch.getSchema().getFieldCount()];
+    // all existing vectors will be transferred to the outgoing container in framer.doWork()
     for (VectorWrapper wrapper : batch) {
-      // read value from saved batch
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
-        new ValueVectorReadExpression(new TypedFieldId(wrapper.getField().getType(), wrapper.isHyper(), j)),
-        batch, collector, context.getFunctionRegistry());
-
-      ValueVector vv = container.addOrGet(wrapper.getField());
-      vv.allocateNew();
-
-      // write value into container
-      TypedFieldId id = container.getValueVectorId(vv.getField().getPath());
-      windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
-      j++;
+      container.addOrGet(wrapper.getField());
     }
 
     // add aggregation vectors to the container, and materialize corresponding expressions
-    LogicalExpression[] aggExprs = new LogicalExpression[popConfig.getAggregations().length];
-    for (int i = 0; i < aggExprs.length; i++) {
-      // evaluate expression over saved batch
-      NamedExpression ne = popConfig.getAggregations()[i];
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
-
-      // add corresponding ValueVector to container
-      final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
-      ValueVector vv = container.addOrGet(outputField);
-      vv.allocateNew();
-
-      // write value into container
-      TypedFieldId id = container.getValueVectorId(ne.getRef());
-      aggExprs[i] = new ValueVectorWriteExpression(id, expr, true);
+    for (final NamedExpression ne : popConfig.getAggregations()) {
+      final WindowFunction wf = WindowFunction.fromExpression(ne.getExpr());
+
+      if (wf != null) {
+        // add corresponding ValueVector to container
+        final MaterializedField outputField = MaterializedField.create(ne.getRef(), wf.getMajorType());
+        ValueVector vv = container.addOrGet(outputField);
+        vv.allocateNew();
+        winExprs.put(wf, container.getValueVectorId(ne.getRef()));
+      } else {
+        // evaluate expression over saved batch
+        final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
+
+        // add corresponding ValueVector to container
+        final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
+        ValueVector vv = container.addOrGet(outputField);
+        vv.allocateNew();
+        TypedFieldId id = container.getValueVectorId(ne.getRef());
+        aggExprs.add(new ValueVectorWriteExpression(id, expr, true));
+      }
     }
 
     if (container.isSchemaChanged()) {
@@ -254,17 +289,15 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     // materialize partition by expressions
-    LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length];
-    for (int i = 0; i < keyExprs.length; i++) {
-      NamedExpression ne = popConfig.getWithins()[i];
-      keyExprs[i] = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
+    for (final NamedExpression ne : popConfig.getWithins()) {
+      keyExprs.add(
+        ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()));
     }
 
     // materialize order by expressions
-    LogicalExpression[] orderExprs = new LogicalExpression[popConfig.getOrderings().length];
-    for (int i = 0; i < orderExprs.length; i++) {
-      Order.Ordering oe = popConfig.getOrderings()[i];
-      orderExprs[i] = ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry());
+    for (final Order.Ordering oe : popConfig.getOrderings()) {
+      orderExprs.add(
+        ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry()));
     }
 
     if (collector.hasErrors()) {
@@ -272,14 +305,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     }
 
     // generate framer code
-
     final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-    // setup for isSamePartition()
-    setupIsFunction(cg, keyExprs, isaB1, isaB2);
-    // setup for isPeer()
-    setupIsFunction(cg, orderExprs, isaP1, isaP2);
-    setupAddRecords(cg, aggExprs);
-    setupOutputWindowValues(cg, windowExprs);
+    setupIsFunction(cg, keyExprs, isaB1, isaB2); // setup for isSamePartition()
+    setupIsFunction(cg, orderExprs, isaP1, isaP2); // setup for isPeer()
+    setupOutputAggregatedValues(cg, aggExprs);
+    setupAddWindowValue(cg, winExprs);
 
     cg.getBlock("resetValues")._return(JExpr.TRUE);
 
@@ -300,7 +330,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   /**
    * setup comparison functions isSamePartition and isPeer
    */
-  private void setupIsFunction(ClassGenerator<WindowFramer> cg, LogicalExpression[] exprs, MappingSet leftMapping, MappingSet rightMapping) {
+  private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final List<LogicalExpression> exprs,
+                               final MappingSet leftMapping, final MappingSet rightMapping) {
     cg.setMappingSet(leftMapping);
     for (LogicalExpression expr : exprs) {
       cg.setMappingSet(leftMapping);
@@ -317,36 +348,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     cg.getEvalBlock()._return(JExpr.TRUE);
   }
 
-  private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupIncoming", "addRecord", null, null);
-  private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupOutgoing", "outputRecordValues", "resetValues", "cleanup");
+  private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupRead", "aggregateRecord", null, null);
+  private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupWrite", "outputAggregatedValues", "resetValues", "cleanup");
   private final MappingSet eval = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
 
   /**
-   * setup for addRecords() and outputRecordValues()
+   * setup for aggregateRecord() and outputAggregatedValues()
    */
-  private void setupAddRecords(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) {
+  private void setupOutputAggregatedValues(ClassGenerator<WindowFramer> cg, List<LogicalExpression> valueExprs) {
     cg.setMappingSet(eval);
     for (LogicalExpression ex : valueExprs) {
       cg.addExpr(ex);
     }
   }
 
-  private final static GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupCopy", "outputWindowValues", null, null);
-  private final MappingSet windowValues = new MappingSet("index", "index", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);
-
-  private void setupOutputWindowValues(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) {
-    cg.setMappingSet(windowValues);
-    for (LogicalExpression valueExpr : valueExprs) {
-      cg.addExpr(valueExpr);
+  /**
+   * generate code to write "computed" window function values into their respective value vectors
+   */
+  private void setupAddWindowValue(final ClassGenerator<WindowFramer> cg, final Map<WindowFunction, TypedFieldId> functions) {
+    cg.setMappingSet(eval);
+    for (WindowFunction function : functions.keySet()) {
+      final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), functions.get(function));
+      final JExpression outIndex = cg.getMappingSet().getValueWriteIndex();
+      JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex).arg(
+        JExpr.direct("partition." + function.name().toLowerCase()));
+      cg.getEvalBlock().add(setMethod);
     }
   }
 
-  @Override
-  public void close() {
+  private void cleanup() {
     if (framer != null) {
       framer.cleanup();
       framer = null;
     }
+
+    if (batches != null) {
+      for (final WindowDataBatch bd : batches) {
+        bd.clear();
+      }
+      batches = null;
+    }
+  }
+
+  @Override
+  public void close() {
+    cleanup();
     super.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
deleted file mode 100644
index 78bab54..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.window;
-
-import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
-
-import javax.inject.Named;
-import java.util.Iterator;
-import java.util.List;
-
-
-public abstract class WindowFrameTemplate implements WindowFramer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameTemplate.class);
-
-  private VectorAccessible container;
-  private List<RecordBatchData> batches;
-  private int outputCount; // number of rows in currently/last processed batch
-
-  /**
-   * current partition being processed. Can span over multiple batches, so we may need to keep it between calls to doWork()
-   */
-  private Interval partition;
-
-  private int currentBatch; // first unprocessed batch
-
-  @Override
-  public void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException {
-    this.container = container;
-    this.batches = batches;
-
-    outputCount = 0;
-    partition = null;
-    currentBatch = 0;
-
-    setupOutgoing(container);
-  }
-
-  /**
-   * processes all rows of current batch:
-   * <ul>
-   *   <li>compute window aggregations</li>
-   *   <li>copy remaining vectors from current batch to container</li>
-   * </ul>
-   */
-  @Override
-  public void doWork() throws DrillException {
-    logger.trace("WindowFramer.doWork() START, num batches {}, currentBatch {}", batches.size(), currentBatch);
-
-    final VectorAccessible current = batches.get(currentBatch).getContainer();
-
-    // we need to store the record count explicitly, in case we release current batch at the end of this call
-    outputCount = current.getRecordCount();
-
-    // allocate vectors
-    for (VectorWrapper<?> w : container){
-      w.getValueVector().allocateNew();
-    }
-
-    setupCopy(current, container);
-
-    int currentRow = 0;
-
-    while (currentRow < outputCount) {
-      if (partition != null) {
-        assert currentRow == 0 : "pending windows are only expected at the start of the batch";
-
-        // we have a pending window we need to handle from a previous call to doWork()
-        logger.trace("we have a pending partition {}", partition);
-      } else {
-        // compute the size of the new partition
-        final int length = computePartitionSize(currentRow);
-        partition = new Interval(currentRow, length, 0);
-      }
-
-      currentRow = processPartition(currentRow);
-
-      if (partition == null) {
-        freeBatches(currentRow == outputCount);
-      }
-    }
-
-    if (partition != null) {
-      logger.trace("we have a pending partition {}", partition);
-      // current batch has been processed but it won't be released until this pending partition is processed
-      currentBatch++;
-    }
-
-    for (VectorWrapper<?> v : container) {
-      v.getValueVector().getMutator().setValueCount(outputCount);
-    }
-
-    logger.trace("WindowFramer.doWork() END");
-  }
-
-  /**
-   * releases saved batches that are no longer needed: all rows of their partitions have been processed
-   * @param freeCurrent do we free current batch too ?
-   */
-  private void freeBatches(boolean freeCurrent) {
-    // how many batches can be released
-    int numFree = currentBatch;
-    if (freeCurrent) { // current batch can also be released
-      numFree++;
-    }
-    if (numFree > 0) {
-      logger.trace("freeing {} batches", numFree);
-
-      // we are ready to free batches < currentBatch
-      for (int i = 0; i < numFree; i++) {
-        RecordBatchData bd = batches.remove(0);
-        bd.getContainer().clear();
-      }
-
-      currentBatch = 0;
-    }
-  }
-
-  /**
-   * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
-   * @return index of next unprocessed row
-   * @throws DrillException if it can't write into the container
-   */
-  private int processPartition(int currentRow) throws DrillException {
-    logger.trace("process partition {}, currentBatch: {}, currentRow: {}", partition, currentBatch, currentRow);
-
-    // compute how many rows remain unprocessed in the current partition
-    int remaining = partition.length;
-    for (int b = 0; b < currentBatch; b++) {
-      remaining -= batches.get(b).getRecordCount();
-    }
-    remaining -= currentRow - partition.start;
-
-    // when computing the frame for the current row, keep track of how many peer rows need to be processed
-    // because all peer rows share the same frame, we only need to compute and aggregate the frame once
-    for (int peers = 0; currentRow < outputCount && remaining > 0; currentRow++, remaining--) {
-      if (peers == 0) {
-        Interval frame = computeFrame(currentBatch, currentRow);
-        resetValues();
-        aggregate(frame);
-        peers = frame.peers;
-      } else {
-        peers--;
-      }
-
-      outputRecordValues(currentRow);
-      outputWindowValues(currentRow);
-    }
-
-    if (remaining == 0) {
-      logger.trace("finished processing {}", partition);
-      partition = null;
-    }
-
-    return currentRow;
-  }
-
-  /**
-   * @return number of rows that are part of the partition starting at row start of first batch
-   */
-  private int computePartitionSize(int start) {
-    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
-
-    // current partition always starts from first batch
-    final VectorAccessible first = batches.get(0).getContainer();
-
-    int length = 0;
-
-    // count all rows that are in the same partition of start
-    outer:
-    for (RecordBatchData batch : batches) {
-      final VectorAccessible cont = batch.getContainer();
-
-      // check first container from start row, and subsequent containers from first row
-      for (int row = (cont == first ? start : 0); row < cont.getRecordCount(); row++) {
-        if (isSamePartition(start, first, row, cont)) {
-          length++;
-        } else {
-          break outer;
-        }
-      }
-    }
-
-    return length;
-  }
-
-  /**
-   * find the limits of the window frame for a row
-   * @param batchId batch where the current row is
-   * @param row idx of row in the given batch
-   * @return frame interval
-   */
-  private Interval computeFrame(int batchId, int row) {
-
-    // using default frame for now RANGE BETWEEN UNBOUND PRECEDING AND CURRENT ROW
-    // frame contains all rows from start of partition to last peer of row
-
-    // count how many rows in the current partition precede the row
-    int length = row;
-    // include rows of all batches previous to batchId
-    Iterator<RecordBatchData> iterator = batches.iterator();
-    for (int b = 0; b < batchId; b++) {
-      length += iterator.next().getRecordCount();
-    }
-    length -= partition.start;
-
-    VectorAccessible batch = iterator.next().getContainer();
-    VectorAccessible current = batch;
-
-    // for every remaining row in the partition, count it if it's a peer row
-    int peers = 0;
-    for (int curRow = row; length < partition.length; length++, curRow++, peers++) {
-      if (curRow == current.getRecordCount()) {
-        current = iterator.next().getContainer();
-        curRow = 0;
-      }
-
-      if (!isPeer(row, batch, curRow, current)) {
-        break;
-      }
-    }
-
-    // do not count row as a peer
-    return new Interval(partition.start, length, peers-1);
-  }
-
-  private void aggregate(Interval frame) throws SchemaChangeException {
-    logger.trace("aggregating {}", frame);
-    assert frame.length > 0 : "processing empty frame!";
-
-    // a single frame can include rows from multiple batches
-    // start processing first batch and, if necessary, move to next batches
-    Iterator<RecordBatchData> iterator = batches.iterator();
-    VectorAccessible current = iterator.next().getContainer();
-    setupIncoming(current);
-
-    for (int i = 0, row = frame.start; i < frame.length; i++, row++) {
-      if (row >= current.getRecordCount()) {
-        // we reached the end of the current batch, move to the next one
-        current = iterator.next().getContainer();
-        setupIncoming(current);
-        row = 0;
-      }
-
-      addRecord(row);
-    }
-  }
-
-  @Override
-  public boolean canDoWork() {
-    // check if we can process a saved batch
-    if (batches.size() > 1) {
-      final VectorAccessible last = batches.get(batches.size()-1).getContainer();
-
-      if (!isSamePartition(getCurrent().getRecordCount() - 1, getCurrent(), last.getRecordCount() - 1, last)) {
-        logger.trace("partition changed, we are ready to process first saved batch");
-        return true;
-      } else {
-        logger.trace("partition didn't change, fetch next batch");
-      }
-    } else {
-      logger.trace("we don't have enough batches to proceed, fetch next batch");
-    }
-
-    return false;
-  }
-
-  @Override
-  public VectorAccessible getCurrent() {
-    return batches.get(currentBatch).getContainer();
-  }
-
-  @Override
-  public int getOutputCount() {
-    return outputCount;
-  }
-
-  @Override
-  public void cleanup() {
-  }
-
-  /**
-   * setup incoming container for addRecord()
-   */
-  public abstract void setupIncoming(@Named("incoming") VectorAccessible incoming) throws SchemaChangeException;
-
-  /**
-   * setup outgoing container for outputRecordValues
-   */
-  public abstract void setupOutgoing(@Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
-  /**
-   * setup for outputWindowValues
-   */
-  public abstract void setupCopy(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
-  /**
-   * aggregates a row from the incoming container
-   * @param index of row to aggregate
-   */
-  public abstract void addRecord(@Named("index") int index);
-
-  /**
-   * writes aggregated values to row of outgoing container
-   * @param outIndex index of row
-   */
-  public abstract void outputRecordValues(@Named("outIndex") int outIndex);
-
-  /**
-   * copies all value vectors from incoming to container, for a specific row
-   * @param index of row to be copied
-   */
-  public abstract void outputWindowValues(@Named("index") int index);
-
-  /**
-   * reset all window functions
-   */
-  public abstract boolean resetValues();
-
-  /**
-   * compares two rows from different batches (can be the same), if they have the same value for the partition by
-   * expression
-   * @param b1Index index of first row
-   * @param b1 batch for first row
-   * @param b2Index index of second row
-   * @param b2 batch for second row
-   * @return true if the rows are in the same partition
-   */
-  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-
-  /**
-   * compares two rows from different batches (can be the same), if they have the same value for the order by
-   * expression
-   * @param b1Index index of first row
-   * @param b1 batch for first row
-   * @param b2Index index of second row
-   * @param b2 batch for second row
-   * @return true if the rows are in the same partition
-   */
-  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-
-  /**
-   * Used internally to keep track of partitions and frames
-   */
-  private static class Interval {
-    public final int start;
-    public final int length;
-    public final int peers; // we only need this for frames
-
-    public Interval(int start, int length, int peers) {
-      this.start = start;
-      this.length = length;
-      this.peers = peers;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("{start: %d, length: %d, peers: %d}", start, length, peers);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 23a2b53..69866af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -20,21 +20,20 @@ package org.apache.drill.exec.physical.impl.window;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
 
 import java.util.List;
 
 public interface WindowFramer {
-  public static TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, WindowFrameTemplate.class);
+  TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
 
-  public abstract void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException;
+  void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException;
 
   /**
    * process the inner batch and write the aggregated values in the container
    * @throws DrillException
    */
-  public abstract void doWork() throws DrillException;
+  void doWork() throws DrillException;
 
   /**
    * check if current batch can be processed:
@@ -44,16 +43,12 @@ public interface WindowFramer {
    * </ol>
    * @return true if current batch can be processed, false otherwise
    */
-  public abstract boolean canDoWork();
+  boolean canDoWork();
+
   /**
    * @return number rows processed in last batch
    */
-  public abstract int getOutputCount();
-
-  public abstract void cleanup();
+  int getOutputCount();
 
-  /**
-   * @return saved batch that will be processed in doWork()
-   */
-  public abstract VectorAccessible getCurrent();
+  void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b21ae3..6686fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -17,12 +17,14 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import com.typesafe.config.ConfigException;
 import io.netty.buffer.DrillBuf;
 
 import java.util.Queue;
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -36,7 +38,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Queues;
 
 public abstract class MSortTemplate implements MSorter, IndexedSortable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
@@ -68,7 +70,16 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       }
     }
     final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
-    aux = new SelectionVector4(drillBuf, totalCount, Character.MAX_VALUE);
+
+    // This is only useful for debugging: change the maximum size of batches exposed to downstream
+    // when we don't spill to disk
+    int MSORT_BATCH_MAXSIZE;
+    try {
+      MSORT_BATCH_MAXSIZE = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE);
+    } catch(ConfigException.Missing e) {
+      MSORT_BATCH_MAXSIZE = Character.MAX_VALUE;
+    }
+    aux = new SelectionVector4(drillBuf, totalCount, MSORT_BATCH_MAXSIZE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 330ec79..ff53052 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
-  protected final VectorContainer container; //= new VectorContainer();
+  protected final VectorContainer container;
   protected final T popConfig;
   protected final FragmentContext context;
   protected final OperatorContext oContext;
@@ -56,8 +56,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
-    this.stats = oContext.getStats();
-    this.container = new VectorContainer(this.oContext);
+    stats = oContext.getStats();
+    container = new VectorContainer(this.oContext);
     if (buildSchema) {
       state = BatchState.BUILD_SCHEMA;
     } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index adbf653..5d8cd95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedDataTypeException;
 import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestDisabledFunctionality extends BaseTestQuery{
@@ -286,18 +285,6 @@ public class TestDisabledFunctionality extends BaseTestQuery{
     }
   }
 
-  @Test(expected = UnsupportedFunctionException.class) // see DRILL-2441
-  public void testDisabledWindowFunctions() throws Exception {
-    try {
-      test("SELECT employee_id,position_id, salary, avg(salary) " +
-          "OVER (PARTITION BY position_id order by position_id) " +
-          "FROM cp.`employee.json` " +
-          "order by employee_id;");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
   @Test(expected = UnsupportedFunctionException.class) // see DRILL-2181
   public void testFlattenWithinGroupBy() throws Exception {
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
new file mode 100644
index 0000000..623c1e2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.util.TestTools;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class GenerateTestData {
+  private static final int SUB_MUL = 1;
+  private static final int BATCH_SIZE = 20;
+
+  private static class Partition {
+    Partition previous;
+    final int length;
+    final int[] subs;
+
+    public Partition(int length, int[] subs) {
+      this.length = length;
+      this.subs = subs;
+    }
+
+    /**
+     * @return total number of rows since first partition, this partition included
+     */
+    public int cumulLength() {
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      return length + prevLength;
+    }
+
+    public boolean isPartOf(int rowNumber) {
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      return rowNumber >= prevLength && rowNumber < cumulLength();
+    }
+
+    public int getSubIndex(final int sub) {
+      return Arrays.binarySearch(subs, sub);
+    }
+
+    public int getSubSize(int sub) {
+      if (sub != subs[subs.length-1]) {
+        return sub * SUB_MUL;
+      } else {
+        //last sub has enough rows to reach partition length
+        int size = length;
+        for (int i = 0; i < subs.length-1; i++) {
+          size -= subs[i] * SUB_MUL;
+        }
+        return size;
+      }
+    }
+
+    /**
+     * @return sub id of the sub that contains rowNumber
+     */
+    public int getSubId(int rowNumber) {
+      assert isPartOf(rowNumber) : "row "+rowNumber+" isn't part of this partition";
+
+      int prevLength = previous != null ? previous.cumulLength() : 0;
+      rowNumber -= prevLength; // row num from start of this partition
+
+      for (int s : subs) {
+        if (rowNumber < subRunningCount(s)) {
+          return s;
+        }
+      }
+
+      throw new RuntimeException("should never happen!");
+    }
+
+    /**
+     * @return running count of rows from first row of the partition to current sub, this sub included
+     */
+    public int subRunningCount(int sub) {
+      int count = 0;
+      for (int s : subs) {
+        count += getSubSize(s);
+        if (s == sub) {
+          break;
+        }
+      }
+      return count;
+    }
+
+    /**
+     * @return running sum of salaries from first row of the partition to current sub, this sub included
+     */
+    public int subRunningSum(int sub) {
+      int sum = 0;
+      for (int s : subs) {
+        sum += (s+10) * getSubSize(s);
+        if (s == sub) {
+          break;
+        }
+      }
+      return sum;
+    }
+
+    /**
+     * @return sum of salaries for all rows of the partition
+     */
+    public int totalSalary() {
+      return subRunningSum(subs[subs.length-1]);
+    }
+
+  }
+
+  private static Partition[] dataB1P1() {
+    // partition rows 20, subs [1, 2, 3, 4, 5, 6]
+    return new Partition[] {
+      new Partition(20, new int[]{1, 2, 3, 4, 5, 6})
+    };
+  }
+
+  private static Partition[] dataB1P2() {
+    // partition rows 10, subs [1, 2, 3, 4]
+    // partition rows 10, subs [4, 5, 6]
+    return new Partition[] {
+      new Partition(10, new int[]{1, 2, 3, 4}),
+      new Partition(10, new int[]{4, 5, 6}),
+    };
+  }
+
+  private static Partition[] dataB2P2() {
+    // partition rows 20, subs [3, 5, 9]
+    // partition rows 20, subs [9, 10]
+    return new Partition[] {
+      new Partition(20, new int[]{3, 5, 9}),
+      new Partition(20, new int[]{9, 10}),
+    };
+  }
+
+  private static Partition[] dataB2P4() {
+    // partition rows 5, subs [1, 2, 3]
+    // partition rows 10, subs [3, 4, 5]
+    // partition rows 15, subs [5, 6, 7]
+    // partition rows 10, subs [7, 8]
+    return new Partition[] {
+      new Partition(5, new int[]{1, 2, 3}),
+      new Partition(10, new int[]{3, 4, 5}),
+      new Partition(15, new int[]{5, 6, 7}),
+      new Partition(10, new int[]{7, 8}),
+    };
+  }
+
+  private static Partition[] dataB3P2() {
+    // partition rows 5, subs [1, 2, 3]
+    // partition rows 55, subs [4, 5, 7, 8, 9, 10, 11, 12]
+    return new Partition[] {
+      new Partition(5, new int[]{1, 2, 3}),
+      new Partition(55, new int[]{4, 5, 7, 8, 9, 10, 11, 12}),
+    };
+  }
+
+  private static Partition[] dataB4P4() {
+    // partition rows 10, subs [1, 2, 3]
+    // partition rows 30, subs [3, 4, 5, 6, 7, 8]
+    // partition rows 20, subs [8, 9, 10]
+    // partition rows 20, subs [10, 11]
+    return new Partition[] {
+      new Partition(10, new int[]{1, 2, 3}),
+      new Partition(30, new int[]{3, 4, 5, 6, 7, 8}),
+      new Partition(20, new int[]{8, 9, 10}),
+      new Partition(20, new int[]{10, 11}),
+    };
+  }
+
+  private static void generateData(final String tableName, final Partition[] partitions) throws FileNotFoundException {
+    final String WORKING_PATH = TestTools.getWorkingPath();
+    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+    //TODO command line arguments contain file name
+    final String path = TEST_RES_PATH+"/window/" + tableName;
+
+    final File pathFolder = new File(path);
+    if (!pathFolder.exists()) {
+      if (!pathFolder.mkdirs()) {
+        System.err.printf("Couldn't create folder %s, exiting%n", path);
+      }
+    }
+
+    // expected results for query without order by clause
+    final PrintStream resultStream = new PrintStream(path + ".tsv");
+    // expected results for query with order by clause
+    final PrintStream resultOrderStream = new PrintStream(path + ".subs.tsv");
+
+    // data file(s)
+    int fileId = 0;
+    PrintStream dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+
+    for (Partition p : partitions) {
+      dataStream.printf("// partition rows %d, subs %s%n", p.length, Arrays.toString(p.subs));
+    }
+
+    // set previous partitions
+    for (int i = 1; i < partitions.length; i++) {
+      partitions[i].previous = partitions[i - 1];
+    }
+
+    // total number of rows
+    int total = partitions[partitions.length - 1].cumulLength();
+
+    // create data rows in randome order
+    List<Integer> emp_ids = new ArrayList<>(total);
+    for (int i = 0; i < total; i++) {
+      emp_ids.add(i);
+    }
+    Collections.shuffle(emp_ids);
+
+    int emp_idx = 0;
+    for (int id : emp_ids) {
+      int p = 0;
+      while (!partitions[p].isPartOf(id)) { // emp x is @ row x-1
+        p++;
+      }
+
+      int sub = partitions[p].getSubId(id);
+      int salary = 10 + sub;
+
+      dataStream.printf("{ \"employee_id\":%d, \"position_id\":%d, \"sub\":%d, \"salary\":%d }%n", id, p + 1, sub, salary);
+      emp_idx++;
+      if ((emp_idx % BATCH_SIZE)==0 && emp_idx < total) {
+        System.out.printf("total: %d, emp_idx: %d, fileID: %d%n", total, emp_idx, fileId);
+        dataStream.close();
+        fileId++;
+        dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+      }
+    }
+
+    dataStream.close();
+
+    for (int p = 0, idx = 0; p < partitions.length; p++) {
+      for (int i = 0; i < partitions[p].length; i++, idx++) {
+        final Partition partition = partitions[p]; //TODO change for p loop to for over partitions
+
+        final int sub = partition.getSubId(idx);
+        final int rowNumber = i + 1;
+        final int rank = 1 + partition.subRunningCount(sub) - partition.getSubSize(sub);
+        final int denseRank = partition.getSubIndex(sub) + 1;
+        final double cumeDist = (double) partition.subRunningCount(sub) / partition.length;
+        final double percentRank = partition.length == 1 ? 0 : (double)(rank - 1)/(partition.length - 1);
+
+        // each line has: count(*)  sum(salary)  row_number()  rank()  dense_rank()  cume_dist()  percent_rank()
+        resultOrderStream.printf("%d\t%d\t%d\t%d\t%d\t%s\t%s%n",
+          partition.subRunningCount(sub), partition.subRunningSum(sub),
+          rowNumber, rank, denseRank, Double.toString(cumeDist), Double.toString(percentRank));
+
+        // each line has: count(*)  sum(salary)
+        resultStream.printf("%d\t%d%n", partition.length, partition.totalSalary());
+      }
+    }
+
+    resultStream.close();
+    resultOrderStream.close();
+  }
+
+  public static void main(String[] args) throws FileNotFoundException {
+    generateData("b1.p1", dataB1P1());
+    generateData("b1.p2", dataB1P2());
+    generateData("b2.p2", dataB2P2());
+    generateData("b2.p4", dataB2P4());
+    generateData("b3.p2", dataB3P2());
+    generateData("b4.p4", dataB4P4());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 2b8bd64..15fefa5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -18,19 +18,60 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Properties;
+
 public class TestWindowFrame extends BaseTestQuery {
 
-  private void runTest(String data, String results, String window) throws Exception {
-    testNoResult("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS);
-    testBuilder()
-      .sqlQuery("select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from cp.`window/%s.json` window pos_win as (%s)", data, window)
+  private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
+  private static final String QUERY_NO_ORDERBY =
+    "select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from dfs_test.`%s/window/%s` window pos_win as (partition by position_id)";
+  private static final String QUERY_ORDERBY =
+    "select count(*) over pos_win `count`, sum(salary) over pos_win `sum`, row_number() over pos_win `row_number`, rank() over pos_win `rank`, dense_rank() over pos_win `dense_rank`, cume_dist() over pos_win `cume_dist`, percent_rank() over pos_win `percent_rank`  from dfs_test.`%s/window/%s` window pos_win as (partition by position_id order by sub)";
+
+  @BeforeClass
+  public static void setupMSortBatchSize() {
+    // make sure memory sorter outputs 20 rows per batch
+    final Properties props = cloneDefaultTestConfigProperties();
+    props.put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, Integer.toString(20));
+
+    updateTestCluster(1, DrillConfig.create(props));
+  }
+
+  private DrillTestWrapper buildWindowQuery(final String tableName) throws Exception {
+    return testBuilder()
+      .sqlQuery(String.format(QUERY_NO_ORDERBY, TEST_RES_PATH, tableName))
       .ordered()
-      .csvBaselineFile("window/" + results + ".tsv")
+      .csvBaselineFile("window/" + tableName + ".tsv")
       .baselineColumns("count", "sum")
-      .build().run();
+      .build();
+  }
+
+  private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName) throws Exception {
+    return testBuilder()
+      .sqlQuery(String.format(QUERY_ORDERBY, TEST_RES_PATH, tableName))
+      .ordered()
+      .csvBaselineFile("window/" + tableName + ".subs.tsv")
+      .baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank")
+      .build();
+  }
+
+  private void runTest(final String tableName, final boolean withOrderBy) throws Exception {
+    runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+
+    try {
+      DrillTestWrapper testWrapper = withOrderBy ?
+        buildWindowWithOrderByQuery(tableName) : buildWindowQuery(tableName);
+      testWrapper.run();
+    } finally {
+      runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+    }
   }
 
   /**
@@ -38,7 +79,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB1P1() throws Exception {
-    runTest("b1.p1.data", "b1.p1", "partition by position_id order by position_id");
+    runTest("b1.p1", false);
   }
 
   /**
@@ -46,7 +87,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB1P1OrderBy() throws Exception {
-    runTest("b1.p1.data", "b1.p1.subs", "partition by position_id order by sub");
+    runTest("b1.p1", true);
   }
 
   /**
@@ -54,7 +95,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB1P2() throws Exception {
-    runTest("b1.p2.data", "b1.p2", "partition by position_id order by position_id");
+    runTest("b1.p2", false);
   }
 
   /**
@@ -63,7 +104,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB1P2OrderBy() throws Exception {
-    runTest("b1.p2.data", "b1.p2.subs", "partition by position_id order by sub");
+    runTest("b1.p2", true);
   }
 
   /**
@@ -71,12 +112,12 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB2P2() throws Exception {
-    runTest("b2.p2.data", "b2.p2", "partition by position_id order by position_id");
+    runTest("b2.p2", false);
   }
 
   @Test
   public void testB2P2OrderBy() throws Exception {
-    runTest("b2.p2.data", "b2.p2.subs", "partition by position_id order by sub");
+    runTest("b2.p2", true);
   }
 
   /**
@@ -84,7 +125,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB2P4() throws Exception {
-    runTest("b2.p4.data", "b2.p4", "partition by position_id order by position_id");
+    runTest("b2.p4", false);
   }
 
   /**
@@ -93,7 +134,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB2P4OrderBy() throws Exception {
-    runTest("b2.p4.data", "b2.p4.subs", "partition by position_id order by sub");
+    runTest("b2.p4", true);
   }
 
   /**
@@ -101,7 +142,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB3P2() throws Exception {
-    runTest("b3.p2.data", "b3.p2", "partition by position_id order by position_id");
+    runTest("b3.p2", false);
   }
 
   /**
@@ -110,7 +151,7 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testB3P2OrderBy() throws Exception {
-    runTest("b3.p2.data", "b3.p2.subs", "partition by position_id order by sub");
+    runTest("b3.p2", true);
   }
 
   /**
@@ -119,7 +160,23 @@ public class TestWindowFrame extends BaseTestQuery {
    */
   @Test
   public void testb4P4() throws Exception {
-    runTest("b4.p4.data", "b4.p4", "partition by position_id order by position_id");
+    runTest("b4.p4", false);
+  }
+
+  @Test
+  public void testb4P4OrderBy() throws Exception {
+    runTest("b4.p4", true);
   }
 
+  @Test // DRILL-3218
+  public void testMaxVarChar() throws Exception {
+    runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+
+    try {
+      test("select max(cast(columns[2] as char(2))) over(partition by cast(columns[2] as char(2)) order by cast(columns[0] as int)) from dfs_test.`%s/window/allData.csv`", TEST_RES_PATH);
+    } finally {
+      runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/allData.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/allData.csv b/exec/java-exec/src/test/resources/window/allData.csv
new file mode 100644
index 0000000..29ab38f
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/allData.csv
@@ -0,0 +1,5 @@
+-337516559,39342852852629160,VT,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,1952-08-14,false,729363085.95,8:16:8.58
+406158122,81588677006971200,IN,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,2001-03-08,false,1292460500.48,9:11:49.17
+1221407024,30009558124347168,VT,DXXXXXXXXXXXXXXXXXXXXXXXXXEXXXXXXXXXXXXXXXXXXXXXXXXF,2014-06-02 00:28:02.419,2000-10-18,true,395110006.277,18:44:25.43
+-1609141704,47841997008600128,ND,GXXXXXXXXXXXXXXXXXXXXXXXXXHXXXXXXXXXXXXXXXXXXXXXXXXI,2014-06-02 00:28:02.420,1991-05-13,true,1293582041.37,20:52:8.56
+-1032159521,38891661529640288,SD,HXXXXXXXXXXXXXXXXXXXXXXXXXIXXXXXXXXXXXXXXXXXXXXXXXXJ,2014-06-02 00:28:02.420,1965-02-21,false,983657842.924,19:46:10.42

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
new file mode 100644
index 0000000..8368d4a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
@@ -0,0 +1,20 @@
+1	11	1	1	1	0.05	0.0
+3	35	2	2	2	0.15	0.05263157894736842
+3	35	3	2	2	0.15	0.05263157894736842
+6	74	4	4	3	0.3	0.15789473684210525
+6	74	5	4	3	0.3	0.15789473684210525
+6	74	6	4	3	0.3	0.15789473684210525
+10	130	7	7	4	0.5	0.3157894736842105
+10	130	8	7	4	0.5	0.3157894736842105
+10	130	9	7	4	0.5	0.3157894736842105
+10	130	10	7	4	0.5	0.3157894736842105
+15	205	11	11	5	0.75	0.5263157894736842
+15	205	12	11	5	0.75	0.5263157894736842
+15	205	13	11	5	0.75	0.5263157894736842
+15	205	14	11	5	0.75	0.5263157894736842
+15	205	15	11	5	0.75	0.5263157894736842
+20	285	16	16	6	1.0	0.7894736842105263
+20	285	17	16	6	1.0	0.7894736842105263
+20	285	18	16	6	1.0	0.7894736842105263
+20	285	19	16	6	1.0	0.7894736842105263
+20	285	20	16	6	1.0	0.7894736842105263

http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b1.p1.tsv b/exec/java-exec/src/test/resources/window/b1.p1.tsv
new file mode 100644
index 0000000..32f6ab7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b1.p1.tsv
@@ -0,0 +1,20 @@
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285
+20	285