You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/06/11 20:30:52 UTC
[2/2] drill git commit: DRILL-3200: Add Window functions: ROW_NUMBER,
RANK, PERCENT_RANK, DENSE_RANK and CUME_DIST
DRILL-3200: Add Window functions: ROW_NUMBER, RANK, PERCENT_RANK, DENSE_RANK and CUME_DIST
- enum WindowFrameRecordBatch.WindowFunction to handle supported window function and their corresponding output MajorType
- renamed WindowFrameTemplate -> DefaultFrameTemplate, cleaned the template to handle the default frame efficiently:
. a batch can be processed as soon as we find the last peer row of it's last row
. once a batch is processed it can be safely released => we can transfer it's value vectors to the container instead of copying them
- DefaultFrameTemplate.Partition tracks the current window frame and computes the following window functions automatically: row_number, rank, dense_rank, percent_rank, cume_dist. It doesn't need to aggregate the value vectors to compute these window functions
- updated TestWindowFrame to check the results of row_number, rank, dense_rank, percent_rank and cume_dist in various cases
. added a debug config option to MSorter to control the size of batches. This is needed by TestWindowFrame so it can use small test data files (20 rows per batch)
. removed contrib/data/window-test-data
- WindowFrameRecordBatch properly releases saved batches if the query stops prematurely
- GenerateTestData can be used to generate test data for the window function unit tests [it's a work in progress and can be either improved to make it developer friendly or removed from the final patch]
- using newly created WindowDataBatch in place of RecordDataBatch, to expose FragmentContext and VectorAccessible (fixes DRILL-3218)
- window.enable is true by default
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3bccec91
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3bccec91
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3bccec91
Branch: refs/heads/master
Commit: 3bccec9110c7ff86fa3cf04baa81a1747e1f5b9e
Parents: 453f6f7
Author: adeneche <ad...@gmail.com>
Authored: Fri Mar 13 10:06:32 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Thu Jun 11 10:59:35 2015 -0700
----------------------------------------------------------------------
contrib/data/pom.xml | 1 -
contrib/data/window-test-data/pom.xml | 64 --
exec/java-exec/pom.xml | 6 -
.../org/apache/drill/exec/ExecConstants.java | 3 +-
.../impl/window/DefaultFrameTemplate.java | 320 ++++++
.../exec/physical/impl/window/Partition.java | 82 ++
.../physical/impl/window/WindowDataBatch.java | 93 ++
.../impl/window/WindowFrameRecordBatch.java | 216 ++--
.../impl/window/WindowFrameTemplate.java | 379 -------
.../exec/physical/impl/window/WindowFramer.java | 21 +-
.../exec/physical/impl/xsort/MSortTemplate.java | 15 +-
.../drill/exec/record/AbstractRecordBatch.java | 6 +-
.../apache/drill/TestDisabledFunctionality.java | 13 -
.../physical/impl/window/GenerateTestData.java | 286 +++++
.../physical/impl/window/TestWindowFrame.java | 91 +-
.../src/test/resources/window/allData.csv | 5 +
.../src/test/resources/window/b1.p1.subs.tsv | 20 +
.../src/test/resources/window/b1.p1.tsv | 20 +
.../src/test/resources/window/b1.p1/0.data.json | 21 +
.../src/test/resources/window/b1.p2.subs.tsv | 20 +
.../src/test/resources/window/b1.p2.tsv | 20 +
.../src/test/resources/window/b1.p2/0.data.json | 22 +
.../src/test/resources/window/b2.p2.subs.tsv | 40 +
.../src/test/resources/window/b2.p2.tsv | 40 +
.../src/test/resources/window/b2.p2/0.data.json | 22 +
.../src/test/resources/window/b2.p2/1.data.json | 20 +
.../src/test/resources/window/b2.p4.subs.tsv | 40 +
.../src/test/resources/window/b2.p4.tsv | 40 +
.../src/test/resources/window/b2.p4/0.data.json | 24 +
.../src/test/resources/window/b2.p4/1.data.json | 20 +
.../src/test/resources/window/b3.p2.subs.tsv | 60 ++
.../src/test/resources/window/b3.p2.tsv | 60 ++
.../src/test/resources/window/b3.p2/0.data.json | 22 +
.../src/test/resources/window/b3.p2/1.data.json | 20 +
.../src/test/resources/window/b3.p2/2.data.json | 20 +
.../src/test/resources/window/b4.p4.subs.tsv | 80 ++
.../src/test/resources/window/b4.p4.tsv | 80 ++
.../src/test/resources/window/b4.p4/0.data.json | 24 +
.../src/test/resources/window/b4.p4/1.data.json | 20 +
.../src/test/resources/window/b4.p4/2.data.json | 20 +
.../src/test/resources/window/b4.p4/3.data.json | 20 +
.../src/test/resources/window/mediumData.json | 1000 ------------------
.../src/test/resources/window/oneKeyCount.json | 43 -
.../test/resources/window/oneKeyCountData.json | 4 -
.../resources/window/oneKeyCountMultiBatch.json | 72 --
.../src/test/resources/window/twoKeys.json | 44 -
.../src/test/resources/window/twoKeysData.json | 8 -
47 files changed, 1812 insertions(+), 1755 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/data/pom.xml b/contrib/data/pom.xml
index d1def76..450bc0d 100644
--- a/contrib/data/pom.xml
+++ b/contrib/data/pom.xml
@@ -33,6 +33,5 @@
<modules>
<module>tpch-sample-data</module>
- <module>window-test-data</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/window-test-data/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/data/window-test-data/pom.xml b/contrib/data/window-test-data/pom.xml
deleted file mode 100644
index 6d195da..0000000
--- a/contrib/data/window-test-data/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>drill-contrib-data-parent</artifactId>
- <groupId>org.apache.drill.contrib.data</groupId>
- <version>1.1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>window-test-data</artifactId>
- <name>contrib/data/window-test-data</name>
- <packaging>jar</packaging>
-
- <dependencies>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>com.googlecode.maven-download-plugin</groupId>
- <artifactId>download-maven-plugin</artifactId>
- <version>1.2.0</version>
- <executions>
- <execution>
- <id>install-tgz</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>wget</goal>
- </goals>
- <configuration>
- <url>https://s3-us-west-2.amazonaws.com/denbucket/window_test_data_0.1.tgz</url>
- <outputFileName>window.tgz</outputFileName>
- <unpack>true</unpack>
- <outputDirectory>${project.build.directory}/classes/window</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <pluginRepositories>
- <pluginRepository>
- <id>sonatype-public-repository</id>
- <url>https://oss.sonatype.org/content/groups/public</url>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- </releases>
- </pluginRepository>
- </pluginRepositories>
-
-</project>
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index b5cd52b..5cc209d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -38,12 +38,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.drill.contrib.data</groupId>
- <artifactId>window-test-data</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
<!-- <dependency> -->
<!-- <groupId>org.ow2.asm</groupId> -->
<!-- <artifactId>asm-commons</artifactId> -->
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 91793f5..8ea90e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -71,6 +71,7 @@ public interface ExecConstants {
public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
+ public static final String EXTERNAL_SORT_MSORT_MAX_BATCHSIZE = "drill.exec.sort.external.msort.batch.maxsize";
public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
@@ -238,7 +239,7 @@ public interface ExecConstants {
public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
- public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
+ public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, true);
public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
new file mode 100644
index 0000000..ada068b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import javax.inject.Named;
+import java.util.Iterator;
+import java.util.List;
+
+
+public abstract class DefaultFrameTemplate implements WindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class);
+
+ private VectorContainer container;
+ private List<WindowDataBatch> batches;
+ private int outputCount; // number of rows in currently/last processed batch
+
+ /**
+ * current partition being processed.</p>
+ * Can span over multiple batches, so we may need to keep it between calls to doWork()
+ */
+ private Partition partition;
+
+ @Override
+ public void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException {
+ this.container = container;
+ this.batches = batches;
+
+ outputCount = 0;
+ partition = null;
+ }
+
+ private void allocateOutgoing() {
+ for (VectorWrapper<?> w : container) {
+ w.getValueVector().allocateNew();
+ }
+ }
+
+ /**
+ * processes all rows of current batch:
+ * <ul>
+ * <li>compute aggregations</li>
+ * <li>compute window functions</li>
+ * <li>transfer remaining vectors from current batch to container</li>
+ * </ul>
+ */
+ @Override
+ public void doWork() throws DrillException {
+ int currentRow = 0;
+
+ logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows",
+ batches.size(), batches.get(0).getRecordCount());
+
+ allocateOutgoing();
+
+ final WindowDataBatch current = batches.get(0);
+
+ // we need to store the record count explicitly, because we release current batch at the end of this call
+ outputCount = current.getRecordCount();
+
+ while (currentRow < outputCount) {
+ if (partition != null) {
+ assert currentRow == 0 : "pending windows are only expected at the start of the batch";
+
+ // we have a pending window we need to handle from a previous call to doWork()
+ logger.trace("we have a pending partition {}", partition);
+ } else {
+ final int length = computePartitionSize(currentRow);
+ partition = new Partition(length);
+ setupWrite(current, container);
+ }
+
+ currentRow = processPartition(currentRow);
+ if (partition.isDone()) {
+ partition = null;
+ resetValues();
+ }
+ }
+
+ // transfer "non aggregated" vectors
+ for (VectorWrapper<?> vw : current) {
+ ValueVector v = container.addOrGet(vw.getField());
+ TransferPair tp = vw.getValueVector().makeTransferPair(v);
+ tp.transfer();
+ }
+
+ for (VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputCount);
+ }
+
+ // because we are using the default frame, and we keep the aggregated value until we start a new frame
+ // we can safely free the current batch
+ batches.remove(0).clear();
+
+ logger.trace("WindowFramer.doWork() END");
+ }
+
+ /**
+ * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
+ * @param currentRow first unprocessed row
+ * @return index of next unprocessed row
+ * @throws DrillException if it can't write into the container
+ */
+ private int processPartition(final int currentRow) throws DrillException {
+ logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
+
+ int row = currentRow;
+ while (row < outputCount && !partition.isDone()) {
+ if (partition.isFrameDone()) {
+ // because all peer rows share the same frame, we only need to compute and aggregate the frame once
+ partition.newFrame(countPeers(row));
+ aggregatePeers(row);
+ }
+
+ outputAggregatedValues(row, partition);
+
+ partition.rowAggregated();
+ row++;
+ }
+
+ return row;
+ }
+
+ /**
+ * @return number of rows that are part of the partition starting at row start of first batch
+ */
+ private int computePartitionSize(final int start) {
+ logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
+
+ // current partition always starts from first batch
+ final VectorAccessible first = getCurrent();
+
+ int length = 0;
+
+ // count all rows that are in the same partition of start
+ // keep increasing length until we find first row of next partition or we reach the very
+ // last batch
+ for (WindowDataBatch batch : batches) {
+ final int recordCount = batch.getRecordCount();
+
+ // check first container from start row, and subsequent containers from first row
+ for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++) {
+ if (!isSamePartition(start, first, row, batch)) {
+ return length;
+ }
+ }
+ }
+
+ return length;
+ }
+
+ /**
+ * Counts how many rows are peer with the first row of the current frame
+ * @param start first row of current frame
+ * @return number of peer rows
+ */
+ private int countPeers(final int start) {
+ // current frame always starts from first batch
+ final VectorAccessible first = getCurrent();
+
+ int length = 0;
+
+ // count all rows that are in the same frame of starting row
+ // keep increasing length until we find first non peer row we reach the very
+ // last batch
+ for (WindowDataBatch batch : batches) {
+ final int recordCount = batch.getRecordCount();
+
+ // for every remaining row in the partition, count it if it's a peer row
+ final int remaining = partition.getRemaining();
+ for (int row = (batch == first) ? start : 0; row < recordCount && length < remaining; row++, length++) {
+ if (!isPeer(start, first, row, batch)) {
+ return length;
+ }
+ }
+ }
+
+ return length;
+ }
+
+ /**
+ * aggregates all peer rows of current row
+ * @param currentRow starting row of the current frame
+ * @throws SchemaChangeException
+ */
+ private void aggregatePeers(final int currentRow) throws SchemaChangeException {
+ logger.trace("aggregating {} rows starting from {}", partition.getPeers(), currentRow);
+ assert !partition.isFrameDone() : "frame is empty!";
+
+ // a single frame can include rows from multiple batches
+ // start processing first batch and, if necessary, move to next batches
+ Iterator<WindowDataBatch> iterator = batches.iterator();
+ WindowDataBatch current = iterator.next();
+ setupRead(current, container);
+
+ final int peers = partition.getPeers();
+ for (int i = 0, row = currentRow; i < peers; i++, row++) {
+ if (row >= current.getRecordCount()) {
+ // we reached the end of the current batch, move to the next one
+ current = iterator.next();
+ setupRead(current, container);
+ row = 0;
+ }
+
+ aggregateRecord(row);
+ }
+ }
+
+ @Override
+ public boolean canDoWork() {
+ // check if we can process a saved batch
+ if (batches.size() < 2) {
+ logger.trace("we don't have enough batches to proceed, fetch next batch");
+ return false;
+ }
+
+ final VectorAccessible current = getCurrent();
+ final int currentSize = current.getRecordCount();
+ final VectorAccessible last = batches.get(batches.size() - 1);
+ final int lastSize = last.getRecordCount();
+
+ if (!isSamePartition(currentSize - 1, current, lastSize - 1, last)
+ || !isPeer(currentSize - 1, current, lastSize - 1, last)) {
+ logger.trace("frame changed, we are ready to process first saved batch");
+ return true;
+ } else {
+ logger.trace("frame didn't change, fetch next batch");
+ return false;
+ }
+ }
+
+ /**
+ * @return saved batch that will be processed in doWork()
+ */
+ private VectorAccessible getCurrent() {
+ return batches.get(0);
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputCount;
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ /**
+ * setup incoming container for aggregateRecord()
+ */
+ public abstract void setupRead(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+ /**
+ * setup outgoing container for outputAggregatedValues. This will also reset the aggregations in most cases.
+ */
+ public abstract void setupWrite(@Named("incoming") WindowDataBatch incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
+
+ /**
+ * aggregates a row from the incoming container
+ * @param index of row to aggregate
+ */
+ public abstract void aggregateRecord(@Named("index") int index);
+
+ /**
+ * writes aggregated values to row of outgoing container
+ * @param outIndex index of row
+ */
+ public abstract void outputAggregatedValues(@Named("outIndex") int outIndex, @Named("partition") Partition partition);
+
+ /**
+ * reset all window functions
+ */
+ public abstract boolean resetValues();
+
+ /**
+ * compares two rows from different batches (can be the same), if they have the same value for the partition by
+ * expression
+ * @param b1Index index of first row
+ * @param b1 batch for first row
+ * @param b2Index index of second row
+ * @param b2 batch for second row
+ * @return true if the rows are in the same partition
+ */
+ public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+ @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+
+ /**
+ * compares two rows from different batches (can be the same), if they have the same value for the order by
+ * expression
+ * @param b1Index index of first row
+ * @param b1 batch for first row
+ * @param b2Index index of second row
+ * @param b2 batch for second row
+ * @return true if the rows are in the same partition
+ */
+ public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
+ @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
new file mode 100644
index 0000000..8d6728e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+/**
+ * Used internally to keep track of partitions and frames
+ */
+public class Partition {
+ private final int length; // size of this partition
+ private int remaining;
+ private int peers;
+
+ // we keep these attributes public because the generated code needs to access them
+ public int row_number;
+ public int rank;
+ public int dense_rank;
+ public double percent_rank;
+ public double cume_dist;
+
+ /**
+ * @return number of rows not yet aggregated in this partition
+ */
+ public int getRemaining() {
+ return remaining;
+ }
+
+ /**
+ * @return peer rows not yet aggregated in current frame
+ */
+ public int getPeers() {
+ return peers;
+ }
+
+ public Partition(int length) {
+ this.length = length;
+ remaining = length;
+ row_number = 1;
+ }
+
+ public void rowAggregated() {
+ remaining--;
+ peers--;
+
+ row_number++;
+ }
+
+ public void newFrame(int peers) {
+ this.peers = peers;
+ rank = row_number; // rank = row number of 1st peer
+ dense_rank++;
+ percent_rank = length > 1 ? (double) (rank - 1) / (length - 1) : 0;
+ cume_dist = (double)(rank + peers - 1) / length;
+ }
+
+ public boolean isDone() {
+ return remaining == 0;
+ }
+
+ public boolean isFrameDone() {
+ return peers == 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{length: %d, remaining partition: %d, remaining peers: %d}", length, remaining, peers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
new file mode 100644
index 0000000..5045cb3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class WindowDataBatch implements VectorAccessible {
+
+ private final FragmentContext context;
+ private final VectorContainer container;
+ private final int recordCount;
+
+ public WindowDataBatch(final VectorAccessible batch, final FragmentContext context) {
+ this.context = context;
+ recordCount = batch.getRecordCount();
+
+ List<ValueVector> vectors = Lists.newArrayList();
+
+ for (VectorWrapper<?> v : batch) {
+ if (v.isHyper()) {
+ throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ }
+ TransferPair tp = v.getValueVector().getTransferPair();
+ tp.transfer();
+ vectors.add(tp.getTo());
+ }
+
+ container = new VectorContainer();
+ container.addCollection(vectors);
+ container.setRecordCount(recordCount);
+ container.buildSchema(batch.getSchema().getSelectionVectorMode());
+ }
+
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+ return container.getValueAccessorById(clazz, fieldIds);
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ public void clear() {
+ container.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 428632f..da189eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -19,13 +19,21 @@ package org.apache.drill.exec.physical.impl.window;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.Maps;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -33,13 +41,11 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.WindowPOP;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -61,12 +67,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class);
private final RecordBatch incoming;
- private List<RecordBatchData> batches;
+ private List<WindowDataBatch> batches;
private WindowFramer framer;
private boolean noMoreBatches;
private BatchSchema schema;
+ /**
+ * Describes supported window functions and if they output FLOAT8 or BIGINT
+ */
+ private enum WindowFunction {
+ ROW_NUMBER(false),
+ RANK(false),
+ DENSE_RANK(false),
+ PERCENT_RANK(true),
+ CUME_DIST(true);
+
+ private final boolean useDouble;
+
+ WindowFunction(boolean useDouble) {
+ this.useDouble = useDouble;
+ }
+
+ public TypeProtos.MajorType getMajorType() {
+ return useDouble ? Types.required(TypeProtos.MinorType.FLOAT8) : Types.required(TypeProtos.MinorType.BIGINT);
+ }
+
+ /**
+ * Extract the WindowFunction corresponding to the logical expression
+ * @param expr logical expression
+ * @return WindowFunction or null if the logical expression is not a window function
+ */
+ public static WindowFunction fromExpression(final LogicalExpression expr) {
+ if (!(expr instanceof FunctionCall)) {
+ return null;
+ }
+
+ final String name = ((FunctionCall) expr).getName();
+ try {
+ return WindowFunction.valueOf(name.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return null; // not a window function
+ }
+ }
+ }
+
public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
@@ -90,7 +135,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
*
* <p><pre>
* when innerNext() is called:
- * call next(incoming), we receive and save b0 in a list of RecordDataBatch
+ * call next(incoming), we receive and save b0 in a list of WindowDataBatch
* we can't process b0 yet because we don't know if p1 has more rows upstream
* call next(incoming), we receive and save b1
* we can't process b0 yet for the same reason previously stated
@@ -107,7 +152,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
* when innerNext() is called:
* we return NONE
* </pre></p>
- *
+ * The previous scenario applies when we don't have an ORDER BY clause, otherwise a batch can be processed
+ * as soon as we reach the final peer row of the batch's last row (we find the end of the last frame of the batch).
+ * </p>
+ * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of
+ * a partition. We can safely free a batch as soon as it has been processed.
*/
@Override
public IterOutcome innerNext() {
@@ -130,24 +179,21 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
+ cleanup();
return upstream;
case OK_NEW_SCHEMA:
- // when a partition of rows exceeds the current processed batch, it will be kept as "pending" and processed
- // when innerNext() is called again. If the schema changes, the framer is "rebuilt" and the pending information
- // will be lost which may lead to incorrect results.
-
- // only change in the case that the schema truly changes. Artificial schema changes are ignored.
+ // We don't support schema changes
if (!incoming.getSchema().equals(schema)) {
if (schema != null) {
- throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ throw new UnsupportedOperationException("OVER clause doesn't currently support changing schemas.");
}
this.schema = incoming.getSchema();
}
case OK:
- batches.add(new RecordBatchData(incoming));
+ batches.add(new WindowDataBatch(incoming, context));
break;
default:
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Unsupported upstrean state " + upstream);
}
}
@@ -157,15 +203,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
return IterOutcome.NONE;
}
- // process a saved batch
+ // process first saved batch, then release it
try {
framer.doWork();
} catch (DrillException e) {
context.fail(e);
- if (framer != null) {
- framer.cleanup();
- framer = null;
- }
+ cleanup();
return IterOutcome.STOP;
}
@@ -201,52 +244,44 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException {
- logger.trace("creating framer");
+ assert framer == null : "createFramer should only be called once";
- container.clear();
+ logger.trace("creating framer");
- if (framer != null) {
- framer.cleanup();
- framer = null;
- }
+ final List<LogicalExpression> aggExprs = Lists.newArrayList();
+ final Map<WindowFunction, TypedFieldId> winExprs = Maps.newHashMap();
+ final List<LogicalExpression> keyExprs = Lists.newArrayList();
+ final List<LogicalExpression> orderExprs = Lists.newArrayList();
+ final ErrorCollector collector = new ErrorCollectorImpl();
- ErrorCollector collector = new ErrorCollectorImpl();
+ container.clear();
- // setup code generation to copy all incoming vectors to the container
- // we can't just transfer them because after we pass the container downstream, some values will be needed when
- // processing the next batches
- int j = 0;
- LogicalExpression[] windowExprs = new LogicalExpression[batch.getSchema().getFieldCount()];
+ // all existing vectors will be transferred to the outgoing container in framer.doWork()
for (VectorWrapper wrapper : batch) {
- // read value from saved batch
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
- new ValueVectorReadExpression(new TypedFieldId(wrapper.getField().getType(), wrapper.isHyper(), j)),
- batch, collector, context.getFunctionRegistry());
-
- ValueVector vv = container.addOrGet(wrapper.getField());
- vv.allocateNew();
-
- // write value into container
- TypedFieldId id = container.getValueVectorId(vv.getField().getPath());
- windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
- j++;
+ container.addOrGet(wrapper.getField());
}
// add aggregation vectors to the container, and materialize corresponding expressions
- LogicalExpression[] aggExprs = new LogicalExpression[popConfig.getAggregations().length];
- for (int i = 0; i < aggExprs.length; i++) {
- // evaluate expression over saved batch
- NamedExpression ne = popConfig.getAggregations()[i];
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
-
- // add corresponding ValueVector to container
- final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
- ValueVector vv = container.addOrGet(outputField);
- vv.allocateNew();
-
- // write value into container
- TypedFieldId id = container.getValueVectorId(ne.getRef());
- aggExprs[i] = new ValueVectorWriteExpression(id, expr, true);
+ for (final NamedExpression ne : popConfig.getAggregations()) {
+ final WindowFunction wf = WindowFunction.fromExpression(ne.getExpr());
+
+ if (wf != null) {
+ // add corresponding ValueVector to container
+ final MaterializedField outputField = MaterializedField.create(ne.getRef(), wf.getMajorType());
+ ValueVector vv = container.addOrGet(outputField);
+ vv.allocateNew();
+ winExprs.put(wf, container.getValueVectorId(ne.getRef()));
+ } else {
+ // evaluate expression over saved batch
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
+
+ // add corresponding ValueVector to container
+ final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
+ ValueVector vv = container.addOrGet(outputField);
+ vv.allocateNew();
+ TypedFieldId id = container.getValueVectorId(ne.getRef());
+ aggExprs.add(new ValueVectorWriteExpression(id, expr, true));
+ }
}
if (container.isSchemaChanged()) {
@@ -254,17 +289,15 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
// materialize partition by expressions
- LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length];
- for (int i = 0; i < keyExprs.length; i++) {
- NamedExpression ne = popConfig.getWithins()[i];
- keyExprs[i] = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry());
+ for (final NamedExpression ne : popConfig.getWithins()) {
+ keyExprs.add(
+ ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()));
}
// materialize order by expressions
- LogicalExpression[] orderExprs = new LogicalExpression[popConfig.getOrderings().length];
- for (int i = 0; i < orderExprs.length; i++) {
- Order.Ordering oe = popConfig.getOrderings()[i];
- orderExprs[i] = ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry());
+ for (final Order.Ordering oe : popConfig.getOrderings()) {
+ orderExprs.add(
+ ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry()));
}
if (collector.hasErrors()) {
@@ -272,14 +305,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
// generate framer code
-
final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- // setup for isSamePartition()
- setupIsFunction(cg, keyExprs, isaB1, isaB2);
- // setup for isPeer()
- setupIsFunction(cg, orderExprs, isaP1, isaP2);
- setupAddRecords(cg, aggExprs);
- setupOutputWindowValues(cg, windowExprs);
+ setupIsFunction(cg, keyExprs, isaB1, isaB2); // setup for isSamePartition()
+ setupIsFunction(cg, orderExprs, isaP1, isaP2); // setup for isPeer()
+ setupOutputAggregatedValues(cg, aggExprs);
+ setupAddWindowValue(cg, winExprs);
cg.getBlock("resetValues")._return(JExpr.TRUE);
@@ -300,7 +330,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
/**
* setup comparison functions isSamePartition and isPeer
*/
- private void setupIsFunction(ClassGenerator<WindowFramer> cg, LogicalExpression[] exprs, MappingSet leftMapping, MappingSet rightMapping) {
+ private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final List<LogicalExpression> exprs,
+ final MappingSet leftMapping, final MappingSet rightMapping) {
cg.setMappingSet(leftMapping);
for (LogicalExpression expr : exprs) {
cg.setMappingSet(leftMapping);
@@ -317,36 +348,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
cg.getEvalBlock()._return(JExpr.TRUE);
}
- private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupIncoming", "addRecord", null, null);
- private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupOutgoing", "outputRecordValues", "resetValues", "cleanup");
+ private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupRead", "aggregateRecord", null, null);
+ private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupWrite", "outputAggregatedValues", "resetValues", "cleanup");
private final MappingSet eval = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
/**
- * setup for addRecords() and outputRecordValues()
+ * setup for aggregateRecord() and outputAggregatedValues()
*/
- private void setupAddRecords(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) {
+ private void setupOutputAggregatedValues(ClassGenerator<WindowFramer> cg, List<LogicalExpression> valueExprs) {
cg.setMappingSet(eval);
for (LogicalExpression ex : valueExprs) {
cg.addExpr(ex);
}
}
- private final static GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupCopy", "outputWindowValues", null, null);
- private final MappingSet windowValues = new MappingSet("index", "index", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);
-
- private void setupOutputWindowValues(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) {
- cg.setMappingSet(windowValues);
- for (LogicalExpression valueExpr : valueExprs) {
- cg.addExpr(valueExpr);
+ /**
+ * generate code to write "computed" window function values into their respective value vectors
+ */
+ private void setupAddWindowValue(final ClassGenerator<WindowFramer> cg, final Map<WindowFunction, TypedFieldId> functions) {
+ cg.setMappingSet(eval);
+ for (WindowFunction function : functions.keySet()) {
+ final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), functions.get(function));
+ final JExpression outIndex = cg.getMappingSet().getValueWriteIndex();
+ JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex).arg(
+ JExpr.direct("partition." + function.name().toLowerCase()));
+ cg.getEvalBlock().add(setMethod);
}
}
- @Override
- public void close() {
+ private void cleanup() {
if (framer != null) {
framer.cleanup();
framer = null;
}
+
+ if (batches != null) {
+ for (final WindowDataBatch bd : batches) {
+ bd.clear();
+ }
+ batches = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ cleanup();
super.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
deleted file mode 100644
index 78bab54..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.window;
-
-import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
-
-import javax.inject.Named;
-import java.util.Iterator;
-import java.util.List;
-
-
-public abstract class WindowFrameTemplate implements WindowFramer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameTemplate.class);
-
- private VectorAccessible container;
- private List<RecordBatchData> batches;
- private int outputCount; // number of rows in currently/last processed batch
-
- /**
- * current partition being processed. Can span over multiple batches, so we may need to keep it between calls to doWork()
- */
- private Interval partition;
-
- private int currentBatch; // first unprocessed batch
-
- @Override
- public void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException {
- this.container = container;
- this.batches = batches;
-
- outputCount = 0;
- partition = null;
- currentBatch = 0;
-
- setupOutgoing(container);
- }
-
- /**
- * processes all rows of current batch:
- * <ul>
- * <li>compute window aggregations</li>
- * <li>copy remaining vectors from current batch to container</li>
- * </ul>
- */
- @Override
- public void doWork() throws DrillException {
- logger.trace("WindowFramer.doWork() START, num batches {}, currentBatch {}", batches.size(), currentBatch);
-
- final VectorAccessible current = batches.get(currentBatch).getContainer();
-
- // we need to store the record count explicitly, in case we release current batch at the end of this call
- outputCount = current.getRecordCount();
-
- // allocate vectors
- for (VectorWrapper<?> w : container){
- w.getValueVector().allocateNew();
- }
-
- setupCopy(current, container);
-
- int currentRow = 0;
-
- while (currentRow < outputCount) {
- if (partition != null) {
- assert currentRow == 0 : "pending windows are only expected at the start of the batch";
-
- // we have a pending window we need to handle from a previous call to doWork()
- logger.trace("we have a pending partition {}", partition);
- } else {
- // compute the size of the new partition
- final int length = computePartitionSize(currentRow);
- partition = new Interval(currentRow, length, 0);
- }
-
- currentRow = processPartition(currentRow);
-
- if (partition == null) {
- freeBatches(currentRow == outputCount);
- }
- }
-
- if (partition != null) {
- logger.trace("we have a pending partition {}", partition);
- // current batch has been processed but it won't be released until this pending partition is processed
- currentBatch++;
- }
-
- for (VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputCount);
- }
-
- logger.trace("WindowFramer.doWork() END");
- }
-
- /**
- * releases saved batches that are no longer needed: all rows of their partitions have been processed
- * @param freeCurrent do we free current batch too ?
- */
- private void freeBatches(boolean freeCurrent) {
- // how many batches can be released
- int numFree = currentBatch;
- if (freeCurrent) { // current batch can also be released
- numFree++;
- }
- if (numFree > 0) {
- logger.trace("freeing {} batches", numFree);
-
- // we are ready to free batches < currentBatch
- for (int i = 0; i < numFree; i++) {
- RecordBatchData bd = batches.remove(0);
- bd.getContainer().clear();
- }
-
- currentBatch = 0;
- }
- }
-
- /**
- * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
- * @return index of next unprocessed row
- * @throws DrillException if it can't write into the container
- */
- private int processPartition(int currentRow) throws DrillException {
- logger.trace("process partition {}, currentBatch: {}, currentRow: {}", partition, currentBatch, currentRow);
-
- // compute how many rows remain unprocessed in the current partition
- int remaining = partition.length;
- for (int b = 0; b < currentBatch; b++) {
- remaining -= batches.get(b).getRecordCount();
- }
- remaining -= currentRow - partition.start;
-
- // when computing the frame for the current row, keep track of how many peer rows need to be processed
- // because all peer rows share the same frame, we only need to compute and aggregate the frame once
- for (int peers = 0; currentRow < outputCount && remaining > 0; currentRow++, remaining--) {
- if (peers == 0) {
- Interval frame = computeFrame(currentBatch, currentRow);
- resetValues();
- aggregate(frame);
- peers = frame.peers;
- } else {
- peers--;
- }
-
- outputRecordValues(currentRow);
- outputWindowValues(currentRow);
- }
-
- if (remaining == 0) {
- logger.trace("finished processing {}", partition);
- partition = null;
- }
-
- return currentRow;
- }
-
- /**
- * @return number of rows that are part of the partition starting at row start of first batch
- */
- private int computePartitionSize(int start) {
- logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
-
- // current partition always starts from first batch
- final VectorAccessible first = batches.get(0).getContainer();
-
- int length = 0;
-
- // count all rows that are in the same partition of start
- outer:
- for (RecordBatchData batch : batches) {
- final VectorAccessible cont = batch.getContainer();
-
- // check first container from start row, and subsequent containers from first row
- for (int row = (cont == first ? start : 0); row < cont.getRecordCount(); row++) {
- if (isSamePartition(start, first, row, cont)) {
- length++;
- } else {
- break outer;
- }
- }
- }
-
- return length;
- }
-
- /**
- * find the limits of the window frame for a row
- * @param batchId batch where the current row is
- * @param row idx of row in the given batch
- * @return frame interval
- */
- private Interval computeFrame(int batchId, int row) {
-
- // using default frame for now RANGE BETWEEN UNBOUND PRECEDING AND CURRENT ROW
- // frame contains all rows from start of partition to last peer of row
-
- // count how many rows in the current partition precede the row
- int length = row;
- // include rows of all batches previous to batchId
- Iterator<RecordBatchData> iterator = batches.iterator();
- for (int b = 0; b < batchId; b++) {
- length += iterator.next().getRecordCount();
- }
- length -= partition.start;
-
- VectorAccessible batch = iterator.next().getContainer();
- VectorAccessible current = batch;
-
- // for every remaining row in the partition, count it if it's a peer row
- int peers = 0;
- for (int curRow = row; length < partition.length; length++, curRow++, peers++) {
- if (curRow == current.getRecordCount()) {
- current = iterator.next().getContainer();
- curRow = 0;
- }
-
- if (!isPeer(row, batch, curRow, current)) {
- break;
- }
- }
-
- // do not count row as a peer
- return new Interval(partition.start, length, peers-1);
- }
-
- private void aggregate(Interval frame) throws SchemaChangeException {
- logger.trace("aggregating {}", frame);
- assert frame.length > 0 : "processing empty frame!";
-
- // a single frame can include rows from multiple batches
- // start processing first batch and, if necessary, move to next batches
- Iterator<RecordBatchData> iterator = batches.iterator();
- VectorAccessible current = iterator.next().getContainer();
- setupIncoming(current);
-
- for (int i = 0, row = frame.start; i < frame.length; i++, row++) {
- if (row >= current.getRecordCount()) {
- // we reached the end of the current batch, move to the next one
- current = iterator.next().getContainer();
- setupIncoming(current);
- row = 0;
- }
-
- addRecord(row);
- }
- }
-
- @Override
- public boolean canDoWork() {
- // check if we can process a saved batch
- if (batches.size() > 1) {
- final VectorAccessible last = batches.get(batches.size()-1).getContainer();
-
- if (!isSamePartition(getCurrent().getRecordCount() - 1, getCurrent(), last.getRecordCount() - 1, last)) {
- logger.trace("partition changed, we are ready to process first saved batch");
- return true;
- } else {
- logger.trace("partition didn't change, fetch next batch");
- }
- } else {
- logger.trace("we don't have enough batches to proceed, fetch next batch");
- }
-
- return false;
- }
-
- @Override
- public VectorAccessible getCurrent() {
- return batches.get(currentBatch).getContainer();
- }
-
- @Override
- public int getOutputCount() {
- return outputCount;
- }
-
- @Override
- public void cleanup() {
- }
-
- /**
- * setup incoming container for addRecord()
- */
- public abstract void setupIncoming(@Named("incoming") VectorAccessible incoming) throws SchemaChangeException;
-
- /**
- * setup outgoing container for outputRecordValues
- */
- public abstract void setupOutgoing(@Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
- /**
- * setup for outputWindowValues
- */
- public abstract void setupCopy(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
-
- /**
- * aggregates a row from the incoming container
- * @param index of row to aggregate
- */
- public abstract void addRecord(@Named("index") int index);
-
- /**
- * writes aggregated values to row of outgoing container
- * @param outIndex index of row
- */
- public abstract void outputRecordValues(@Named("outIndex") int outIndex);
-
- /**
- * copies all value vectors from incoming to container, for a specific row
- * @param index of row to be copied
- */
- public abstract void outputWindowValues(@Named("index") int index);
-
- /**
- * reset all window functions
- */
- public abstract boolean resetValues();
-
- /**
- * compares two rows from different batches (can be the same), if they have the same value for the partition by
- * expression
- * @param b1Index index of first row
- * @param b1 batch for first row
- * @param b2Index index of second row
- * @param b2 batch for second row
- * @return true if the rows are in the same partition
- */
- public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-
- /**
- * compares two rows from different batches (can be the same), if they have the same value for the order by
- * expression
- * @param b1Index index of first row
- * @param b1 batch for first row
- * @param b2Index index of second row
- * @param b2 batch for second row
- * @return true if the rows are in the same partition
- */
- public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
-
- /**
- * Used internally to keep track of partitions and frames
- */
- private static class Interval {
- public final int start;
- public final int length;
- public final int peers; // we only need this for frames
-
- public Interval(int start, int length, int peers) {
- this.start = start;
- this.length = length;
- this.peers = peers;
- }
-
- @Override
- public String toString() {
- return String.format("{start: %d, length: %d, peers: %d}", start, length, peers);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index 23a2b53..69866af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -20,21 +20,20 @@ package org.apache.drill.exec.physical.impl.window;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
import java.util.List;
public interface WindowFramer {
- public static TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, WindowFrameTemplate.class);
+ TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class);
- public abstract void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException;
+ void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException;
/**
* process the inner batch and write the aggregated values in the container
* @throws DrillException
*/
- public abstract void doWork() throws DrillException;
+ void doWork() throws DrillException;
/**
* check if current batch can be processed:
@@ -44,16 +43,12 @@ public interface WindowFramer {
* </ol>
* @return true if current batch can be processed, false otherwise
*/
- public abstract boolean canDoWork();
+ boolean canDoWork();
+
/**
* @return number rows processed in last batch
*/
- public abstract int getOutputCount();
-
- public abstract void cleanup();
+ int getOutputCount();
- /**
- * @return saved batch that will be processed in doWork()
- */
- public abstract VectorAccessible getCurrent();
+ void cleanup();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b21ae3..6686fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -17,12 +17,14 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import com.typesafe.config.ConfigException;
import io.netty.buffer.DrillBuf;
import java.util.Queue;
import javax.inject.Named;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -36,7 +38,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;
public abstract class MSortTemplate implements MSorter, IndexedSortable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
private SelectionVector4 vector4;
private SelectionVector4 aux;
@@ -68,7 +70,16 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
}
final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
- aux = new SelectionVector4(drillBuf, totalCount, Character.MAX_VALUE);
+
+ // This is only useful for debugging: change the maximum size of batches exposed to downstream
+ // when we don't spill to disk
+ int MSORT_BATCH_MAXSIZE;
+ try {
+ MSORT_BATCH_MAXSIZE = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE);
+ } catch(ConfigException.Missing e) {
+ MSORT_BATCH_MAXSIZE = Character.MAX_VALUE;
+ }
+ aux = new SelectionVector4(drillBuf, totalCount, MSORT_BATCH_MAXSIZE);
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 330ec79..ff53052 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
- protected final VectorContainer container; //= new VectorContainer();
+ protected final VectorContainer container;
protected final T popConfig;
protected final FragmentContext context;
protected final OperatorContext oContext;
@@ -56,8 +56,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.context = context;
this.popConfig = popConfig;
this.oContext = oContext;
- this.stats = oContext.getStats();
- this.container = new VectorContainer(this.oContext);
+ stats = oContext.getStats();
+ container = new VectorContainer(this.oContext);
if (buildSchema) {
state = BatchState.BUILD_SCHEMA;
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index adbf653..5d8cd95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.apache.drill.exec.work.foreman.UnsupportedDataTypeException;
import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.junit.Ignore;
import org.junit.Test;
public class TestDisabledFunctionality extends BaseTestQuery{
@@ -286,18 +285,6 @@ public class TestDisabledFunctionality extends BaseTestQuery{
}
}
- @Test(expected = UnsupportedFunctionException.class) // see DRILL-2441
- public void testDisabledWindowFunctions() throws Exception {
- try {
- test("SELECT employee_id,position_id, salary, avg(salary) " +
- "OVER (PARTITION BY position_id order by position_id) " +
- "FROM cp.`employee.json` " +
- "order by employee_id;");
- } catch(UserException ex) {
- throwAsUnsupportedException(ex);
- }
- }
-
@Test(expected = UnsupportedFunctionException.class) // see DRILL-2181
public void testFlattenWithinGroupBy() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
new file mode 100644
index 0000000..623c1e2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.common.util.TestTools;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class GenerateTestData {
+ private static final int SUB_MUL = 1;
+ private static final int BATCH_SIZE = 20;
+
+ private static class Partition {
+ Partition previous;
+ final int length;
+ final int[] subs;
+
+ public Partition(int length, int[] subs) {
+ this.length = length;
+ this.subs = subs;
+ }
+
+ /**
+ * @return total number of rows since first partition, this partition included
+ */
+ public int cumulLength() {
+ int prevLength = previous != null ? previous.cumulLength() : 0;
+ return length + prevLength;
+ }
+
+ public boolean isPartOf(int rowNumber) {
+ int prevLength = previous != null ? previous.cumulLength() : 0;
+ return rowNumber >= prevLength && rowNumber < cumulLength();
+ }
+
+ public int getSubIndex(final int sub) {
+ return Arrays.binarySearch(subs, sub);
+ }
+
+ public int getSubSize(int sub) {
+ if (sub != subs[subs.length-1]) {
+ return sub * SUB_MUL;
+ } else {
+ //last sub has enough rows to reach partition length
+ int size = length;
+ for (int i = 0; i < subs.length-1; i++) {
+ size -= subs[i] * SUB_MUL;
+ }
+ return size;
+ }
+ }
+
+ /**
+ * @return sub id of the sub that contains rowNumber
+ */
+ public int getSubId(int rowNumber) {
+ assert isPartOf(rowNumber) : "row "+rowNumber+" isn't part of this partition";
+
+ int prevLength = previous != null ? previous.cumulLength() : 0;
+ rowNumber -= prevLength; // row num from start of this partition
+
+ for (int s : subs) {
+ if (rowNumber < subRunningCount(s)) {
+ return s;
+ }
+ }
+
+ throw new RuntimeException("should never happen!");
+ }
+
+ /**
+ * @return running count of rows from first row of the partition to current sub, this sub included
+ */
+ public int subRunningCount(int sub) {
+ int count = 0;
+ for (int s : subs) {
+ count += getSubSize(s);
+ if (s == sub) {
+ break;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * @return running sum of salaries from first row of the partition to current sub, this sub included
+ */
+ public int subRunningSum(int sub) {
+ int sum = 0;
+ for (int s : subs) {
+ sum += (s+10) * getSubSize(s);
+ if (s == sub) {
+ break;
+ }
+ }
+ return sum;
+ }
+
+ /**
+ * @return sum of salaries for all rows of the partition
+ */
+ public int totalSalary() {
+ return subRunningSum(subs[subs.length-1]);
+ }
+
+ }
+
+ private static Partition[] dataB1P1() {
+ // partition rows 20, subs [1, 2, 3, 4, 5, 6]
+ return new Partition[] {
+ new Partition(20, new int[]{1, 2, 3, 4, 5, 6})
+ };
+ }
+
+ private static Partition[] dataB1P2() {
+ // partition rows 10, subs [1, 2, 3, 4]
+ // partition rows 10, subs [4, 5, 6]
+ return new Partition[] {
+ new Partition(10, new int[]{1, 2, 3, 4}),
+ new Partition(10, new int[]{4, 5, 6}),
+ };
+ }
+
+ private static Partition[] dataB2P2() {
+ // partition rows 20, subs [3, 5, 9]
+ // partition rows 20, subs [9, 10]
+ return new Partition[] {
+ new Partition(20, new int[]{3, 5, 9}),
+ new Partition(20, new int[]{9, 10}),
+ };
+ }
+
+ private static Partition[] dataB2P4() {
+ // partition rows 5, subs [1, 2, 3]
+ // partition rows 10, subs [3, 4, 5]
+ // partition rows 15, subs [5, 6, 7]
+ // partition rows 10, subs [7, 8]
+ return new Partition[] {
+ new Partition(5, new int[]{1, 2, 3}),
+ new Partition(10, new int[]{3, 4, 5}),
+ new Partition(15, new int[]{5, 6, 7}),
+ new Partition(10, new int[]{7, 8}),
+ };
+ }
+
+ private static Partition[] dataB3P2() {
+ // partition rows 5, subs [1, 2, 3]
+ // partition rows 55, subs [4, 5, 7, 8, 9, 10, 11, 12]
+ return new Partition[] {
+ new Partition(5, new int[]{1, 2, 3}),
+ new Partition(55, new int[]{4, 5, 7, 8, 9, 10, 11, 12}),
+ };
+ }
+
+ private static Partition[] dataB4P4() {
+ // partition rows 10, subs [1, 2, 3]
+ // partition rows 30, subs [3, 4, 5, 6, 7, 8]
+ // partition rows 20, subs [8, 9, 10]
+ // partition rows 20, subs [10, 11]
+ return new Partition[] {
+ new Partition(10, new int[]{1, 2, 3}),
+ new Partition(30, new int[]{3, 4, 5, 6, 7, 8}),
+ new Partition(20, new int[]{8, 9, 10}),
+ new Partition(20, new int[]{10, 11}),
+ };
+ }
+
+ private static void generateData(final String tableName, final Partition[] partitions) throws FileNotFoundException {
+ final String WORKING_PATH = TestTools.getWorkingPath();
+ final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+ //TODO command line arguments contain file name
+ final String path = TEST_RES_PATH+"/window/" + tableName;
+
+ final File pathFolder = new File(path);
+ if (!pathFolder.exists()) {
+ if (!pathFolder.mkdirs()) {
+ System.err.printf("Couldn't create folder %s, exiting%n", path);
+ }
+ }
+
+ // expected results for query without order by clause
+ final PrintStream resultStream = new PrintStream(path + ".tsv");
+ // expected results for query with order by clause
+ final PrintStream resultOrderStream = new PrintStream(path + ".subs.tsv");
+
+ // data file(s)
+ int fileId = 0;
+ PrintStream dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+
+ for (Partition p : partitions) {
+ dataStream.printf("// partition rows %d, subs %s%n", p.length, Arrays.toString(p.subs));
+ }
+
+ // set previous partitions
+ for (int i = 1; i < partitions.length; i++) {
+ partitions[i].previous = partitions[i - 1];
+ }
+
+ // total number of rows
+ int total = partitions[partitions.length - 1].cumulLength();
+
+ // create data rows in randome order
+ List<Integer> emp_ids = new ArrayList<>(total);
+ for (int i = 0; i < total; i++) {
+ emp_ids.add(i);
+ }
+ Collections.shuffle(emp_ids);
+
+ int emp_idx = 0;
+ for (int id : emp_ids) {
+ int p = 0;
+ while (!partitions[p].isPartOf(id)) { // emp x is @ row x-1
+ p++;
+ }
+
+ int sub = partitions[p].getSubId(id);
+ int salary = 10 + sub;
+
+ dataStream.printf("{ \"employee_id\":%d, \"position_id\":%d, \"sub\":%d, \"salary\":%d }%n", id, p + 1, sub, salary);
+ emp_idx++;
+ if ((emp_idx % BATCH_SIZE)==0 && emp_idx < total) {
+ System.out.printf("total: %d, emp_idx: %d, fileID: %d%n", total, emp_idx, fileId);
+ dataStream.close();
+ fileId++;
+ dataStream = new PrintStream(path + "/" + fileId + ".data.json");
+ }
+ }
+
+ dataStream.close();
+
+ for (int p = 0, idx = 0; p < partitions.length; p++) {
+ for (int i = 0; i < partitions[p].length; i++, idx++) {
+ final Partition partition = partitions[p]; //TODO change for p loop to for over partitions
+
+ final int sub = partition.getSubId(idx);
+ final int rowNumber = i + 1;
+ final int rank = 1 + partition.subRunningCount(sub) - partition.getSubSize(sub);
+ final int denseRank = partition.getSubIndex(sub) + 1;
+ final double cumeDist = (double) partition.subRunningCount(sub) / partition.length;
+ final double percentRank = partition.length == 1 ? 0 : (double)(rank - 1)/(partition.length - 1);
+
+ // each line has: count(*) sum(salary) row_number() rank() dense_rank() cume_dist() percent_rank()
+ resultOrderStream.printf("%d\t%d\t%d\t%d\t%d\t%s\t%s%n",
+ partition.subRunningCount(sub), partition.subRunningSum(sub),
+ rowNumber, rank, denseRank, Double.toString(cumeDist), Double.toString(percentRank));
+
+ // each line has: count(*) sum(salary)
+ resultStream.printf("%d\t%d%n", partition.length, partition.totalSalary());
+ }
+ }
+
+ resultStream.close();
+ resultOrderStream.close();
+ }
+
+ public static void main(String[] args) throws FileNotFoundException {
+ generateData("b1.p1", dataB1P1());
+ generateData("b1.p2", dataB1P2());
+ generateData("b2.p2", dataB2P2());
+ generateData("b2.p4", dataB2P4());
+ generateData("b3.p2", dataB3P2());
+ generateData("b4.p4", dataB4P4());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 2b8bd64..15fefa5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -18,19 +18,60 @@
package org.apache.drill.exec.physical.impl.window;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.DrillTestWrapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Properties;
+
public class TestWindowFrame extends BaseTestQuery {
- private void runTest(String data, String results, String window) throws Exception {
- testNoResult("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS);
- testBuilder()
- .sqlQuery("select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from cp.`window/%s.json` window pos_win as (%s)", data, window)
+ private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
+ private static final String QUERY_NO_ORDERBY =
+ "select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from dfs_test.`%s/window/%s` window pos_win as (partition by position_id)";
+ private static final String QUERY_ORDERBY =
+ "select count(*) over pos_win `count`, sum(salary) over pos_win `sum`, row_number() over pos_win `row_number`, rank() over pos_win `rank`, dense_rank() over pos_win `dense_rank`, cume_dist() over pos_win `cume_dist`, percent_rank() over pos_win `percent_rank` from dfs_test.`%s/window/%s` window pos_win as (partition by position_id order by sub)";
+
+ @BeforeClass
+ public static void setupMSortBatchSize() {
+ // make sure memory sorter outputs 20 rows per batch
+ final Properties props = cloneDefaultTestConfigProperties();
+ props.put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, Integer.toString(20));
+
+ updateTestCluster(1, DrillConfig.create(props));
+ }
+
+ private DrillTestWrapper buildWindowQuery(final String tableName) throws Exception {
+ return testBuilder()
+ .sqlQuery(String.format(QUERY_NO_ORDERBY, TEST_RES_PATH, tableName))
.ordered()
- .csvBaselineFile("window/" + results + ".tsv")
+ .csvBaselineFile("window/" + tableName + ".tsv")
.baselineColumns("count", "sum")
- .build().run();
+ .build();
+ }
+
+ private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName) throws Exception {
+ return testBuilder()
+ .sqlQuery(String.format(QUERY_ORDERBY, TEST_RES_PATH, tableName))
+ .ordered()
+ .csvBaselineFile("window/" + tableName + ".subs.tsv")
+ .baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank")
+ .build();
+ }
+
+ private void runTest(final String tableName, final boolean withOrderBy) throws Exception {
+ runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+
+ try {
+ DrillTestWrapper testWrapper = withOrderBy ?
+ buildWindowWithOrderByQuery(tableName) : buildWindowQuery(tableName);
+ testWrapper.run();
+ } finally {
+ runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+ }
}
/**
@@ -38,7 +79,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB1P1() throws Exception {
- runTest("b1.p1.data", "b1.p1", "partition by position_id order by position_id");
+ runTest("b1.p1", false);
}
/**
@@ -46,7 +87,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB1P1OrderBy() throws Exception {
- runTest("b1.p1.data", "b1.p1.subs", "partition by position_id order by sub");
+ runTest("b1.p1", true);
}
/**
@@ -54,7 +95,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB1P2() throws Exception {
- runTest("b1.p2.data", "b1.p2", "partition by position_id order by position_id");
+ runTest("b1.p2", false);
}
/**
@@ -63,7 +104,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB1P2OrderBy() throws Exception {
- runTest("b1.p2.data", "b1.p2.subs", "partition by position_id order by sub");
+ runTest("b1.p2", true);
}
/**
@@ -71,12 +112,12 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB2P2() throws Exception {
- runTest("b2.p2.data", "b2.p2", "partition by position_id order by position_id");
+ runTest("b2.p2", false);
}
@Test
public void testB2P2OrderBy() throws Exception {
- runTest("b2.p2.data", "b2.p2.subs", "partition by position_id order by sub");
+ runTest("b2.p2", true);
}
/**
@@ -84,7 +125,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB2P4() throws Exception {
- runTest("b2.p4.data", "b2.p4", "partition by position_id order by position_id");
+ runTest("b2.p4", false);
}
/**
@@ -93,7 +134,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB2P4OrderBy() throws Exception {
- runTest("b2.p4.data", "b2.p4.subs", "partition by position_id order by sub");
+ runTest("b2.p4", true);
}
/**
@@ -101,7 +142,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB3P2() throws Exception {
- runTest("b3.p2.data", "b3.p2", "partition by position_id order by position_id");
+ runTest("b3.p2", false);
}
/**
@@ -110,7 +151,7 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testB3P2OrderBy() throws Exception {
- runTest("b3.p2.data", "b3.p2.subs", "partition by position_id order by sub");
+ runTest("b3.p2", true);
}
/**
@@ -119,7 +160,23 @@ public class TestWindowFrame extends BaseTestQuery {
*/
@Test
public void testb4P4() throws Exception {
- runTest("b4.p4.data", "b4.p4", "partition by position_id order by position_id");
+ runTest("b4.p4", false);
+ }
+
+ @Test
+ public void testb4P4OrderBy() throws Exception {
+ runTest("b4.p4", true);
}
+ @Test // DRILL-3218
+ public void testMaxVarChar() throws Exception {
+ runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+
+ try {
+ test("select max(cast(columns[2] as char(2))) over(partition by cast(columns[2] as char(2)) order by cast(columns[0] as int)) from dfs_test.`%s/window/allData.csv`", TEST_RES_PATH);
+ } finally {
+ runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS));
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/allData.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/allData.csv b/exec/java-exec/src/test/resources/window/allData.csv
new file mode 100644
index 0000000..29ab38f
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/allData.csv
@@ -0,0 +1,5 @@
+-337516559,39342852852629160,VT,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,1952-08-14,false,729363085.95,8:16:8.58
+406158122,81588677006971200,IN,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,2001-03-08,false,1292460500.48,9:11:49.17
+1221407024,30009558124347168,VT,DXXXXXXXXXXXXXXXXXXXXXXXXXEXXXXXXXXXXXXXXXXXXXXXXXXF,2014-06-02 00:28:02.419,2000-10-18,true,395110006.277,18:44:25.43
+-1609141704,47841997008600128,ND,GXXXXXXXXXXXXXXXXXXXXXXXXXHXXXXXXXXXXXXXXXXXXXXXXXXI,2014-06-02 00:28:02.420,1991-05-13,true,1293582041.37,20:52:8.56
+-1032159521,38891661529640288,SD,HXXXXXXXXXXXXXXXXXXXXXXXXXIXXXXXXXXXXXXXXXXXXXXXXXXJ,2014-06-02 00:28:02.420,1965-02-21,false,983657842.924,19:46:10.42
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
new file mode 100644
index 0000000..8368d4a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv
@@ -0,0 +1,20 @@
+1 11 1 1 1 0.05 0.0
+3 35 2 2 2 0.15 0.05263157894736842
+3 35 3 2 2 0.15 0.05263157894736842
+6 74 4 4 3 0.3 0.15789473684210525
+6 74 5 4 3 0.3 0.15789473684210525
+6 74 6 4 3 0.3 0.15789473684210525
+10 130 7 7 4 0.5 0.3157894736842105
+10 130 8 7 4 0.5 0.3157894736842105
+10 130 9 7 4 0.5 0.3157894736842105
+10 130 10 7 4 0.5 0.3157894736842105
+15 205 11 11 5 0.75 0.5263157894736842
+15 205 12 11 5 0.75 0.5263157894736842
+15 205 13 11 5 0.75 0.5263157894736842
+15 205 14 11 5 0.75 0.5263157894736842
+15 205 15 11 5 0.75 0.5263157894736842
+20 285 16 16 6 1.0 0.7894736842105263
+20 285 17 16 6 1.0 0.7894736842105263
+20 285 18 16 6 1.0 0.7894736842105263
+20 285 19 16 6 1.0 0.7894736842105263
+20 285 20 16 6 1.0 0.7894736842105263
http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/window/b1.p1.tsv b/exec/java-exec/src/test/resources/window/b1.p1.tsv
new file mode 100644
index 0000000..32f6ab7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/b1.p1.tsv
@@ -0,0 +1,20 @@
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285
+20 285