You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:10 UTC
[2/5] drill git commit: DRILL-5325: Unit tests for the managed sort
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
new file mode 100644
index 0000000..363c08c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBatchSerialization extends DrillTest {
+
+ public static OperatorFixture fixture;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ fixture = OperatorFixture.builder().build();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ fixture.close();
+ }
+
+ public SingleRowSet makeRowSet(BatchSchema schema, int rowCount) {
+ ExtendableRowSet rowSet = fixture.rowSet(schema);
+ RowSetWriter writer = rowSet.writer(rowCount);
+ for (int i = 0; i < rowCount; i++) {
+ RowSetUtilities.setFromInt(writer, 0, i);
+ writer.save();
+ }
+ writer.done();
+ return rowSet;
+ }
+
+ public SingleRowSet makeNullableRowSet(BatchSchema schema, int rowCount) {
+ ExtendableRowSet rowSet = fixture.rowSet(schema);
+ RowSetWriter writer = rowSet.writer(rowCount);
+ for (int i = 0; i < rowCount; i++) {
+ if (i % 2 == 0) {
+ RowSetUtilities.setFromInt(writer, 0, i);
+ } else {
+ writer.column(0).setNull();
+ }
+ writer.save();
+ }
+ writer.done();
+ return rowSet;
+ }
+
+ public void testType(MinorType type) throws IOException {
+ testNonNullType(type);
+ testNullableType(type);
+ }
+
+ public void testNonNullType(MinorType type) throws IOException {
+ BatchSchema schema = new SchemaBuilder( )
+ .add("col", type)
+ .build();
+ int rowCount = 20;
+ verifySerialize(makeRowSet(schema, rowCount),
+ makeRowSet(schema, rowCount));
+ }
+
+ public void testNullableType(MinorType type) throws IOException {
+ BatchSchema schema = new SchemaBuilder( )
+ .addNullable("col", type)
+ .build();
+ int rowCount = 20;
+ verifySerialize(makeNullableRowSet(schema, rowCount),
+ makeNullableRowSet(schema, rowCount));
+ }
+
+ /**
+ * Verify serialize and deserialize. Need to pass both the
+ * input and expected (even though the expected should be the same
+ * data as the input) because the act of serializing clears the
+ * input for obscure historical reasons.
+ *
+ * @param rowSet
+ * @param expected
+ * @throws IOException
+ */
+ private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
+
+ long origSize = rowSet.size();
+
+ File dir = OperatorFixture.getTempDir("serial");
+ File outFile = new File(dir, "serialze.dat");
+ try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
+ VectorSerializer.writer(fixture.allocator(), out)
+ .write(rowSet.container(), rowSet.getSv2());
+ }
+
+ RowSet result;
+ try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) {
+ result = fixture.wrap(
+ VectorSerializer.reader(fixture.allocator(), in)
+ .read());
+ }
+
+ assertTrue(origSize >= result.size());
+ new RowSetComparison(expected)
+ .verifyAndClear(result);
+ outFile.delete();
+ }
+
+ @Test
+ public void testTypes() throws IOException {
+ testType(MinorType.TINYINT);
+ testType(MinorType.UINT1);
+ testType(MinorType.SMALLINT);
+ testType(MinorType.UINT2);
+ testType(MinorType.INT);
+ testType(MinorType.UINT4);
+ testType(MinorType.BIGINT);
+ testType(MinorType.UINT8);
+ testType(MinorType.FLOAT4);
+ testType(MinorType.FLOAT8);
+ testType(MinorType.DECIMAL9);
+ testType(MinorType.DECIMAL18);
+ testType(MinorType.DECIMAL28SPARSE);
+ testType(MinorType.DECIMAL38SPARSE);
+// testType(MinorType.DECIMAL28DENSE); No writer
+// testType(MinorType.DECIMAL38DENSE); No writer
+ testType(MinorType.DATE);
+ testType(MinorType.TIME);
+ testType(MinorType.TIMESTAMP);
+ testType(MinorType.INTERVAL);
+ testType(MinorType.INTERVALYEAR);
+ testType(MinorType.INTERVALDAY);
+ }
+
+ private SingleRowSet buildMapSet(BatchSchema schema) {
+ return fixture.rowSetBuilder(schema)
+ .add(1, 100, "first")
+ .add(2, 200, "second")
+ .add(3, 300, "third")
+ .build();
+ }
+
+ private SingleRowSet buildArraySet(BatchSchema schema) {
+ return fixture.rowSetBuilder(schema)
+ .add(1, new String[] { "first, second, third" } )
+ .add(2, null)
+ .add(3, new String[] { "third, fourth, fifth" } )
+ .build();
+ }
+
+ /**
+ * Tests a map type and an SV2.
+ *
+ * @throws IOException
+ */
+
+ @Test
+ public void testMap() throws IOException {
+ BatchSchema schema = new SchemaBuilder()
+ .add("top", MinorType.INT)
+ .addMap("map")
+ .add("key", MinorType.INT)
+ .add("value", MinorType.VARCHAR)
+ .buildMap()
+ .build();
+
+ verifySerialize(buildMapSet(schema).toIndirect(),
+ buildMapSet(schema));
+ }
+
+ @Test
+ public void testArray() throws IOException {
+ BatchSchema schema = new SchemaBuilder()
+ .add("top", MinorType.INT)
+ .addArray("arr", MinorType.VARCHAR)
+ .build();
+
+ verifySerialize(buildArraySet(schema).toIndirect(),
+ buildArraySet(schema));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index 96dae6a..2473dc5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -91,6 +91,7 @@ public class TestWriteToDisk extends ExecTest {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
+ @SuppressWarnings("resource")
WritableBatch batch = WritableBatch.getBatchNoHVWrap(
container.getRecordCount(), container, false);
VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(
@@ -107,7 +108,6 @@ public class TestWriteToDisk extends ExecTest {
final Path path = new Path(tempDir.getAbsolutePath(), "drillSerializable");
try (final FSDataOutputStream out = fs.create(path)) {
wrap.writeToStream(out);
- out.close();
}
try (final FSDataInputStream in = fs.open(path)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index 52ebd57..6ead748 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.TestBuilder;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.SecondaryTest;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+@Category(SecondaryTest.class)
public class TestExternalSort extends BaseTestQuery {
@Test
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 50bf710..f643d5f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,7 +35,6 @@ import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.FixtureBuilder;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
@@ -46,6 +45,11 @@ public class TestSimpleExternalSort extends DrillTest {
@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
@Test
+ public void mergeSortWithSv2Managed() throws Exception {
+ mergeSortWithSv2(false);
+ }
+
+ @Test
public void mergeSortWithSv2Legacy() throws Exception {
mergeSortWithSv2(true);
}
@@ -62,26 +66,37 @@ public class TestSimpleExternalSort extends DrillTest {
*/
private void mergeSortWithSv2(boolean testLegacy) throws Exception {
- try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+ ;
+ try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
chooseImpl(client, testLegacy);
List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending_sv2.json").results();
- assertEquals(500000, client.countResults( results ));
+ assertEquals(500_000, client.countResults(results));
validateResults(client.allocator(), results);
}
}
private void chooseImpl(ClientFixture client, boolean testLegacy) throws Exception {
+ client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), testLegacy);
+ }
+
+ @Test
+ public void sortOneKeyDescendingMergeSortManaged() throws Throwable {
+ sortOneKeyDescendingMergeSort(false);
}
@Test
- @Ignore
public void sortOneKeyDescendingMergeSortLegacy() throws Throwable {
sortOneKeyDescendingMergeSort(true);
}
private void sortOneKeyDescendingMergeSort(boolean testLegacy) throws Throwable {
- try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+ ;
+ try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
chooseImpl(client, testLegacy);
List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
@@ -101,7 +116,7 @@ public class TestSimpleExternalSort extends DrillTest {
if (b.getHeader().getRowCount() > 0) {
batchCount++;
loader.load(b.getHeader().getDef(),b.getData());
- @SuppressWarnings("resource")
+ @SuppressWarnings({ "deprecation", "resource" })
BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
BigIntVector.Accessor a1 = c1.getAccessor();
@@ -118,44 +133,56 @@ public class TestSimpleExternalSort extends DrillTest {
System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
}
+ @Test
+ public void sortOneKeyDescendingExternalSortManaged() throws Throwable {
+ sortOneKeyDescendingExternalSort(false);
+ }
@Test
- @Ignore
public void sortOneKeyDescendingExternalSortLegacy() throws Throwable {
sortOneKeyDescendingExternalSort(true);
}
private void sortOneKeyDescendingExternalSort(boolean testLegacy) throws Throwable {
- FixtureBuilder builder = ClusterFixture.builder( )
- .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4 )
- .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4);
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4)
+ .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4)
+ .configProperty(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 4)
+ .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+ ;
try (ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
+ ClientFixture client = cluster.clientFixture()) {
chooseImpl(client,testLegacy);
List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/one_key_sort_descending.json").results();
- assertEquals(1000000, client.countResults( results ));
+ assertEquals(1_000_000, client.countResults(results));
validateResults(client.allocator(), results);
}
}
- @Ignore
+ @Test
+ public void outOfMemoryExternalSortManaged() throws Throwable{
+ outOfMemoryExternalSort(false);
+ }
+
@Test
public void outOfMemoryExternalSortLegacy() throws Throwable{
outOfMemoryExternalSort(true);
}
private void outOfMemoryExternalSort(boolean testLegacy) throws Throwable{
- FixtureBuilder builder = ClusterFixture.builder( )
+ FixtureBuilder builder = ClusterFixture.builder()
// Probably do nothing in modern Drill
- .configProperty( "drill.memory.fragment.max", 50000000 )
- .configProperty( "drill.memory.fragment.initial", 2000000 )
- .configProperty( "drill.memory.operator.max", 30000000 )
- .configProperty( "drill.memory.operator.initial", 2000000 );
+ .configProperty("drill.memory.fragment.max", 50_000_000)
+ .configProperty("drill.memory.fragment.initial", 2_000_000)
+ .configProperty("drill.memory.operator.max", 30_000_000)
+ .configProperty("drill.memory.operator.initial", 2_000_000)
+ .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, testLegacy)
+ ;
try (ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
+ ClientFixture client = cluster.clientFixture()) {
chooseImpl(client,testLegacy);
List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/oom_sort_test.json").results();
- assertEquals(10000000, client.countResults( results ));
+ assertEquals(10_000_000, client.countResults(results));
long previousBigInt = Long.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
index 788caf7..5a1bf6d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,22 +17,23 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.config.DrillConfig;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Properties;
/**
* Testing External Sort's spilling to disk.
@@ -42,30 +43,33 @@ import java.util.Properties;
* <br>
* {@link ExecConstants#EXTERNAL_SORT_SPILL_GROUP_SIZE} = 1
*/
-public class TestSortSpillWithException extends BaseTestQuery {
+public class TestSortSpillWithException extends ClusterTest {
private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
@BeforeClass
- public static void initCluster() {
- // make sure memory sorter outputs 20 rows per batch
- final Properties props = cloneDefaultTestConfigProperties();
- props.put(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, "1");
- props.put(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, "1");
-
- updateTestCluster(1, DrillConfig.create(props));
+ public static void setup() throws Exception {
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // Unmanaged
+ .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) // Unmanaged
+ .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 60 * 1024 * 1024) // Spill early
+ .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+ .maxParallelization(1)
+ ;
+ startCluster(builder);
}
@Test
- public void testSpilLeak() throws Exception {
+ public void testSpillLeakLegacy() throws Exception {
+ client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), true);
// inject exception in sort while spilling
final String controls = Controls.newBuilder()
.addExceptionOnBit(
- ExternalSortBatch.class,
- ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+ org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class,
+ org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
IOException.class,
- bits[0].getContext().getEndpoint())
+ cluster.drillbit().getContext().getEndpoint())
.build();
- ControlsInjectionUtil.setControls(client, controls);
+ ControlsInjectionUtil.setControls(cluster.client(), controls);
// run a simple order by query
try {
test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
@@ -76,4 +80,28 @@ public class TestSortSpillWithException extends BaseTestQuery {
e.getMessage().contains("External Sort encountered an error while spilling to disk"));
}
}
+
+ @Test
+ public void testSpillLeakManaged() throws Exception {
+ client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), false);
+ // inject exception in sort while spilling
+ final String controls = Controls.newBuilder()
+ .addExceptionOnBit(
+ org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.class,
+ org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+ IOException.class,
+ cluster.drillbit().getContext().getEndpoint())
+ .build();
+ ControlsInjectionUtil.setControls(cluster.client(), controls);
+ // run a simple order by query
+ try {
+ test("SELECT id_i, name_s250 FROM `mock`.`employee_500K` ORDER BY id_i");
+// test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
+ fail("Query should have failed!");
+ } catch (UserRemoteException e) {
+ assertEquals(ErrorType.RESOURCE, e.getErrorType());
+ assertTrue("Incorrect error message",
+ e.getMessage().contains("External Sort encountered an error while spilling to disk"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
new file mode 100644
index 0000000..034da2c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetSchema;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+import com.google.common.collect.Lists;
+
+public class SortTestUtilities {
+
+ private SortTestUtilities() { }
+
+ public static BatchSchema makeSchema(MinorType type, boolean nullable) {
+ return new SchemaBuilder()
+ .add("key", type, nullable ? DataMode.OPTIONAL : DataMode.REQUIRED)
+ .add("value", MinorType.VARCHAR)
+ .build();
+ }
+
+ public static BatchSchema nonNullSchema() {
+ return makeSchema(MinorType.INT, false);
+ }
+
+ public static BatchSchema nullableSchema() {
+ return makeSchema(MinorType.INT, true);
+ }
+
+ public static PriorityQueueCopierWrapper makeCopier(OperatorFixture fixture, String sortOrder, String nullOrder) {
+ FieldReference expr = FieldReference.getWithQuotedRef("key");
+ Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
+ Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
+ OperExecContext opContext = fixture.newOperExecContext(popConfig);
+ return new PriorityQueueCopierWrapper(opContext);
+ }
+
+ public static class CopierTester {
+ List<SingleRowSet> rowSets = new ArrayList<>();
+ List<SingleRowSet> expected = new ArrayList<>();
+ String sortOrder = Ordering.ORDER_ASC;
+ String nullOrder = Ordering.NULLS_UNSPECIFIED;
+ private OperatorFixture fixture;
+
+ public CopierTester(OperatorFixture fixture) {
+ this.fixture = fixture;
+ }
+
+ public void addInput(SingleRowSet input) {
+ rowSets.add(input);
+ }
+
+ public void addOutput(SingleRowSet output) {
+ expected.add(output);
+ }
+
+ public void run() throws Exception {
+ PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder);
+ List<BatchGroup> batches = new ArrayList<>();
+ RowSetSchema schema = null;
+ for (SingleRowSet rowSet : rowSets) {
+ batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
+ fixture.allocator(), rowSet.size()));
+ if (schema == null) {
+ schema = rowSet.schema();
+ }
+ }
+ int rowCount = outputRowCount();
+ VectorContainer dest = new VectorContainer();
+ @SuppressWarnings("resource")
+ BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
+ batches, dest, rowCount);
+
+ verifyResults(merger, dest);
+ dest.clear();
+ merger.close();
+ }
+
+ public int outputRowCount() {
+ if (! expected.isEmpty()) {
+ return expected.get(0).rowCount();
+ }
+ return 10;
+ }
+
+ protected void verifyResults(BatchMerger merger, VectorContainer dest) {
+ for (RowSet expectedSet : expected) {
+ assertTrue(merger.next());
+ RowSet rowSet = new DirectRowSet(fixture.allocator(), dest);
+ new RowSetComparison(expectedSet)
+ .verifyAndClear(rowSet);
+ }
+ assertFalse(merger.next());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
new file mode 100644
index 0000000..0050747
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Light-weight sanity test of the copier class. The implementation has
+ * been used in production, so the tests here just check for the obvious
+ * cases.
+ * <p>
+ * Note, however, that if significant changes are made to the copier,
+ * then additional tests should be added to re-validate the code.
+ */
+
+public class TestCopier extends DrillTest {
+
+ public static OperatorFixture fixture;
+
+ @BeforeClass
+ public static void setup() {
+ fixture = OperatorFixture.builder().build();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ fixture.close();
+ }
+
+ @Test
+ public void testEmptyInput() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+ List<BatchGroup> batches = new ArrayList<>();
+ PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
+ VectorContainer dest = new VectorContainer();
+ try {
+ @SuppressWarnings({ "resource", "unused" })
+ BatchMerger merger = copier.startMerge(schema, batches, dest, 10);
+ fail();
+ } catch (AssertionError e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testEmptyBatch() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .withSv2()
+ .build());
+
+ tester.run();
+ }
+
+ @Test
+ public void testSingleRow() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(10, "10")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(10, "10")
+ .build());
+ tester.run();
+ }
+
+ @Test
+ public void testTwoBatchesSingleRow() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(10, "10")
+ .withSv2()
+ .build());
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(20, "20")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(10, "10")
+ .add(20, "20")
+ .build());
+ tester.run();
+ }
+
+ public static SingleRowSet makeDataSet(BatchSchema schema, int first, int step, int count) {
+ ExtendableRowSet rowSet = fixture.rowSet(schema);
+ RowSetWriter writer = rowSet.writer(count);
+ int value = first;
+ for (int i = 0; i < count; i++, value += step) {
+ RowSetUtilities.setFromInt(writer, 0, value);
+ writer.column(1).setString(Integer.toString(value));
+ writer.save();
+ }
+ writer.done();
+ return rowSet;
+ }
+
+ @Test
+ public void testMultipleOutput() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(makeDataSet(schema, 0, 2, 10).toIndirect());
+ tester.addInput(makeDataSet(schema, 1, 2, 10).toIndirect());
+
+ tester.addOutput(makeDataSet(schema, 0, 1, 10));
+ tester.addOutput(makeDataSet(schema, 10, 1, 10));
+ tester.run();
+ }
+
+ // Also verifies that SV2s work
+
+ @Test
+ public void testMultipleOutputDesc() throws Exception {
+ BatchSchema schema = SortTestUtilities.nonNullSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.sortOrder = Ordering.ORDER_DESC;
+ tester.nullOrder = Ordering.NULLS_UNSPECIFIED;
+ SingleRowSet input = makeDataSet(schema, 0, 2, 10).toIndirect();
+ RowSetUtilities.reverse(input.getSv2());
+ tester.addInput(input);
+
+ input = makeDataSet(schema, 1, 2, 10).toIndirect();
+ RowSetUtilities.reverse(input.getSv2());
+ tester.addInput(input);
+
+ tester.addOutput(makeDataSet(schema, 19, -1, 10));
+ tester.addOutput(makeDataSet(schema, 9, -1, 10));
+
+ tester.run();
+ }
+
+ @Test
+ public void testAscNullsLast() throws Exception {
+ BatchSchema schema = SortTestUtilities.nullableSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.sortOrder = Ordering.ORDER_ASC;
+ tester.nullOrder = Ordering.NULLS_LAST;
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(1, "1")
+ .add(4, "4")
+ .add(null, "null")
+ .withSv2()
+ .build());
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(2, "2")
+ .add(3, "3")
+ .add(null, "null")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(1, "1")
+ .add(2, "2")
+ .add(3, "3")
+ .add(4, "4")
+ .add(null, "null")
+ .add(null, "null")
+ .build());
+
+ tester.run();
+ }
+
+ @Test
+ public void testAscNullsFirst() throws Exception {
+ BatchSchema schema = SortTestUtilities.nullableSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.sortOrder = Ordering.ORDER_ASC;
+ tester.nullOrder = Ordering.NULLS_FIRST;
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(1, "1")
+ .add(4, "4")
+ .withSv2()
+ .build());
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(2, "2")
+ .add(3, "3")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(null, "null")
+ .add(1, "1")
+ .add(2, "2")
+ .add(3, "3")
+ .add(4, "4")
+ .build());
+
+ tester.run();
+ }
+
+ @Test
+ public void testDescNullsLast() throws Exception {
+ BatchSchema schema = SortTestUtilities.nullableSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.sortOrder = Ordering.ORDER_DESC;
+ tester.nullOrder = Ordering.NULLS_LAST;
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(4, "4")
+ .add(1, "1")
+ .add(null, "null")
+ .withSv2()
+ .build());
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(3, "3")
+ .add(2, "2")
+ .add(null, "null")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(4, "4")
+ .add(3, "3")
+ .add(2, "2")
+ .add(1, "1")
+ .add(null, "null")
+ .add(null, "null")
+ .build());
+
+ tester.run();
+ }
+
+ @Test
+ public void testDescNullsFirst() throws Exception {
+ BatchSchema schema = SortTestUtilities.nullableSchema();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.sortOrder = Ordering.ORDER_DESC;
+ tester.nullOrder = Ordering.NULLS_FIRST;
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(4, "4")
+ .add(1, "1")
+ .withSv2()
+ .build());
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(3, "3")
+ .add(2, "2")
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(null, "null")
+ .add(null, "null")
+ .add(4, "4")
+ .add(3, "3")
+ .add(2, "2")
+ .add(1, "1")
+ .build());
+
+ tester.run();
+ }
+
+ public static void runTypeTest(OperatorFixture fixture, MinorType type) throws Exception {
+ BatchSchema schema = SortTestUtilities.makeSchema(type, false);
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(makeDataSet(schema, 0, 2, 5).toIndirect());
+ tester.addInput(makeDataSet(schema, 1, 2, 5).toIndirect());
+
+ tester.addOutput(makeDataSet(schema, 0, 1, 10));
+
+ tester.run();
+ }
+
+ @Test
+ public void testTypes() throws Exception {
+ testAllTypes(fixture);
+ }
+
+ public static void testAllTypes(OperatorFixture fixture) throws Exception {
+ runTypeTest(fixture, MinorType.INT);
+ runTypeTest(fixture, MinorType.BIGINT);
+ runTypeTest(fixture, MinorType.FLOAT4);
+ runTypeTest(fixture, MinorType.FLOAT8);
+ runTypeTest(fixture, MinorType.DECIMAL9);
+ runTypeTest(fixture, MinorType.DECIMAL18);
+ runTypeTest(fixture, MinorType.VARCHAR);
+ runTypeTest(fixture, MinorType.VARBINARY);
+ runTypeTest(fixture, MinorType.DATE);
+ runTypeTest(fixture, MinorType.TIME);
+ runTypeTest(fixture, MinorType.TIMESTAMP);
+ runTypeTest(fixture, MinorType.INTERVAL);
+ runTypeTest(fixture, MinorType.INTERVALDAY);
+ runTypeTest(fixture, MinorType.INTERVALYEAR);
+
+ // Others not tested. See DRILL-5329
+ }
+
+ @Test
+ public void testMapType() throws Exception {
+ testMapType(fixture);
+ }
+
+ public void testMapType(OperatorFixture fixture) throws Exception {
+ BatchSchema schema = new SchemaBuilder()
+ .add("key", MinorType.INT)
+ .addMap("m1")
+ .add("b", MinorType.INT)
+ .addMap("m2")
+ .add("c", MinorType.INT)
+ .buildMap()
+ .buildMap()
+ .build();
+
+ CopierTester tester = new CopierTester(fixture);
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(1, 10, 100)
+ .add(5, 50, 500)
+ .withSv2()
+ .build());
+
+ tester.addInput(fixture.rowSetBuilder(schema)
+ .add(2, 20, 200)
+ .add(6, 60, 600)
+ .withSv2()
+ .build());
+
+ tester.addOutput(fixture.rowSetBuilder(schema)
+ .add(1, 10, 100)
+ .add(2, 20, 200)
+ .add(5, 50, 500)
+ .add(6, 60, 600)
+ .build());
+
+ tester.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
new file mode 100644
index 0000000..f5858a3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestExternalSortExec extends DrillTest {
+
+ @Test
+ public void testFieldReference() {
+ // Misnomer: the reference must be unquoted.
+ FieldReference expr = FieldReference.getWithQuotedRef("foo");
+ assertEquals(Types.LATE_BIND_TYPE, expr.getMajorType());
+ assertTrue(expr.isSimplePath());
+ assertEquals("foo", expr.getRootSegment().getPath());
+ assertEquals("`foo`", expr.toExpr());
+ }
+
+ @Test
+ public void testOrdering() {
+ assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(null));
+ assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASC));
+ assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESC));
+ assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASCENDING));
+ assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESCENDING));
+ assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASC.toLowerCase()));
+ assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESC.toLowerCase()));
+ assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASCENDING.toLowerCase()));
+ assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESCENDING.toLowerCase()));
+ try {
+ Ordering.getOrderingSpecFromString("");
+ fail();
+ } catch(DrillRuntimeException e) { }
+ try {
+ Ordering.getOrderingSpecFromString("foo");
+ fail();
+ } catch(DrillRuntimeException e) { }
+
+ assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(null));
+ assertEquals(NullDirection.FIRST, Ordering.getNullOrderingFromString(Ordering.NULLS_FIRST));
+ assertEquals(NullDirection.LAST, Ordering.getNullOrderingFromString(Ordering.NULLS_LAST));
+ assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(Ordering.NULLS_UNSPECIFIED));
+ assertEquals(NullDirection.FIRST, Ordering.getNullOrderingFromString(Ordering.NULLS_FIRST.toLowerCase()));
+ assertEquals(NullDirection.LAST, Ordering.getNullOrderingFromString(Ordering.NULLS_LAST.toLowerCase()));
+ assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(Ordering.NULLS_UNSPECIFIED.toLowerCase()));
+ try {
+ Ordering.getNullOrderingFromString("");
+ fail();
+ } catch(DrillRuntimeException e) { }
+ try {
+ Ordering.getNullOrderingFromString("foo");
+ fail();
+ } catch(DrillRuntimeException e) { }
+
+ FieldReference expr = FieldReference.getWithQuotedRef("foo");
+
+ // Test all getters
+
+ Ordering ordering = new Ordering((String) null, expr, (String) null);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+ assertSame(expr, ordering.getExpr());
+ assertTrue(ordering.nullsSortHigh());
+
+ // Test all ordering strings
+
+ ordering = new Ordering((String) Ordering.ORDER_ASC, expr, (String) null);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+ ordering = new Ordering((String) Ordering.ORDER_ASC.toLowerCase(), expr, (String) null);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+ ordering = new Ordering((String) Ordering.ORDER_ASCENDING, expr, (String) null);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+ ordering = new Ordering((String) Ordering.ORDER_DESC, expr, (String) null);
+ assertEquals(Direction.DESCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+ ordering = new Ordering((String) Ordering.ORDER_DESCENDING, expr, (String) null);
+ assertEquals(Direction.DESCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+ // Test all null ordering strings
+
+ ordering = new Ordering((String) null, expr, Ordering.NULLS_FIRST);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+ assertFalse(ordering.nullsSortHigh());
+
+ ordering = new Ordering((String) null, expr, Ordering.NULLS_FIRST);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+ assertFalse(ordering.nullsSortHigh());
+
+ ordering = new Ordering((String) null, expr, Ordering.NULLS_LAST);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.LAST, ordering.getNullDirection());
+ assertTrue(ordering.nullsSortHigh());
+
+ ordering = new Ordering((String) null, expr, Ordering.NULLS_UNSPECIFIED);
+ assertEquals(Direction.ASCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+ assertTrue(ordering.nullsSortHigh());
+
+ // Unspecified order is always nulls high
+
+ ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_UNSPECIFIED);
+ assertEquals(Direction.DESCENDING, ordering.getDirection());
+ assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+ assertTrue(ordering.nullsSortHigh());
+
+ // Null sort direction reverses with a Desc sort.
+
+ ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_FIRST);
+ assertEquals(Direction.DESCENDING, ordering.getDirection());
+ assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+ assertTrue(ordering.nullsSortHigh());
+
+ ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_LAST);
+ assertEquals(Direction.DESCENDING, ordering.getDirection());
+ assertEquals(NullDirection.LAST, ordering.getNullDirection());
+ assertFalse(ordering.nullsSortHigh());
+ }
+
+ @Test
+ public void testSortSpec() {
+ FieldReference expr = FieldReference.getWithQuotedRef("foo");
+ Ordering ordering = new Ordering(Ordering.ORDER_ASC, expr, Ordering.NULLS_FIRST);
+
+ // Basics
+
+ ExternalSort popConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
+ assertSame(ordering, popConfig.getOrderings().get(0));
+ assertFalse(popConfig.getReverse());
+ assertEquals(SelectionVectorMode.FOUR_BYTE, popConfig.getSVMode());
+ assertEquals(CoreOperatorType.EXTERNAL_SORT_VALUE, popConfig.getOperatorType());
+ assertEquals(ExternalSort.DEFAULT_SORT_ALLOCATION, popConfig.getInitialAllocation());
+ assertEquals(AbstractBase.MAX_ALLOCATION, popConfig.getMaxAllocation());
+ assertTrue(popConfig.isExecutable());
+
+ // Non-default settings
+
+ popConfig = new ExternalSort(null, Lists.newArrayList(ordering), true);
+ assertTrue(popConfig.getReverse());
+ long maxAlloc = 50_000_000;
+ popConfig.setMaxAllocation(maxAlloc);
+ assertEquals(ExternalSort.DEFAULT_SORT_ALLOCATION, popConfig.getInitialAllocation());
+ assertEquals(maxAlloc, popConfig.getMaxAllocation());
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
new file mode 100644
index 0000000..6bff088
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.test.ConfigBuilder;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.junit.Test;
+
+public class TestExternalSortInternals extends DrillTest {
+
+ private static final int ONE_MEG = 1024 * 1024;
+
+ /**
+ * Verify defaults configured in drill-override.conf.
+ */
+ @Test
+ public void testConfigDefaults() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ // Zero means no artificial limit
+ assertEquals(0, sortConfig.maxMemory());
+ // Zero mapped to large number
+ assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
+ // Default size: 256 MiB
+ assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
+ // Default size: 8 MiB
+ assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize());
+ // Default size: 16 MiB
+ assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize());
+ // Default: unlimited
+ assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit());
+ }
+
+ /**
+ * Verify that the various constants do, in fact, map to the
+ * expected properties, and that the properties are overridden.
+ */
+ @Test
+ public void testConfigOverride() {
+ // Verify the various HOCON ways of setting memory
+ DrillConfig drillConfig = new ConfigBuilder()
+ .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
+ .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, "10M")
+ .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
+ .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
+ .build();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ assertEquals(2000 * 1024, sortConfig.maxMemory());
+ assertEquals(10, sortConfig.mergeLimit());
+ assertEquals(10 * ONE_MEG, sortConfig.spillFileSize());
+ assertEquals(500_000, sortConfig.spillBatchSize());
+ assertEquals(600_000, sortConfig.mergeBatchSize());
+ assertEquals(50, sortConfig.getBufferedBatchLimit());
+ }
+
+ /**
+ * Some properties have hard-coded limits. Verify these limits.
+ */
+ @Test
+ public void testConfigLimits() {
+ DrillConfig drillConfig = new ConfigBuilder()
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1)
+ .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE - 1)
+ .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1)
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1)
+ .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
+ .build();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
+ assertEquals(SortConfig.MIN_SPILL_FILE_SIZE, sortConfig.spillFileSize());
+ assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize());
+ assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, sortConfig.mergeBatchSize());
+ assertEquals(2, sortConfig.getBufferedBatchLimit());
+ }
+
+ @Test
+ public void testMemoryManagerBasics() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 50 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Basic setup
+
+ assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+ assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+ assertEquals(memoryLimit, memManager.getMemoryLimit());
+
+ // Nice simple batch: 6 MB in size, 300 byte rows, vectors half full
+ // so 10000 rows. Sizes chosen so that spill and merge batch record
+ // stay below the limit of 64K.
+
+ int rowWidth = 300;
+ int rowCount = 10000;
+ int batchSize = rowWidth * rowCount * 2;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+ // Zero rows - no update
+
+ memManager.updateEstimates(batchSize, rowWidth, 0);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Larger batch size, update batch size
+
+ rowCount = 20000;
+ batchSize = rowWidth * rowCount * 2;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+ // Smaller batch size: no change
+
+ rowCount = 5000;
+ int lowBatchSize = rowWidth * rowCount * 2;
+ memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Different batch density, update batch size
+
+ rowCount = 10000;
+ batchSize = rowWidth * rowCount * 5;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+ // Smaller row size, no update
+
+ int lowRowWidth = 200;
+ rowCount = 10000;
+ lowBatchSize = rowWidth * rowCount * 2;
+ memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Larger row size, updates calcs
+
+ rowWidth = 400;
+ rowCount = 10000;
+ lowBatchSize = rowWidth * rowCount * 2;
+ memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+ verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+ // EOF: very low density
+
+ memManager.updateEstimates(lowBatchSize, rowWidth, 5);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+ }
+
+ private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryManager memManager, int batchSize,
+ int rowWidth, int rowCount) {
+
+ assertFalse(memManager.mayOverflow());
+
+ // Row and batch sizes should be exact
+
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Spill sizes will be rounded, but within reason.
+
+ int count = sortConfig.spillBatchSize() / rowWidth;
+ assertTrue(count >= memManager.getSpillBatchRowCount());
+ assertTrue(count/2 <= memManager.getSpillBatchRowCount());
+ int spillSize = memManager.getSpillBatchRowCount() * rowWidth;
+ assertTrue(spillSize <= memManager.getSpillBatchSize());
+ assertTrue(spillSize >= memManager.getSpillBatchSize()/2);
+ assertEquals(memoryLimit - memManager.getSpillBatchSize(), memManager.getBufferMemoryLimit());
+
+ // Merge sizes will also be rounded, within reason.
+
+ count = sortConfig.mergeBatchSize() / rowWidth;
+ assertTrue(count >= memManager.getMergeBatchRowCount());
+ assertTrue(count/2 <= memManager.getMergeBatchRowCount());
+ int mergeSize = memManager.getMergeBatchRowCount() * rowWidth;
+ assertTrue(mergeSize <= memManager.getMergeBatchSize());
+ assertTrue(mergeSize >= memManager.getMergeBatchSize()/2);
+ assertEquals(memoryLimit - memManager.getMergeBatchSize(), memManager.getMergeMemoryLimit());
+ }
+
+ @Test
+ public void testSmallRows() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 100 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Zero-length row, round to 10
+
+ int rowWidth = 0;
+ int rowCount = 10000;
+ int batchSize = rowCount * 2;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertEquals(10, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Truncate spill, merge batch row count
+
+ assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
+ assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
+
+ // But leave batch sizes at their defaults
+
+ assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+ assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+
+ // Small, but non-zero, row
+
+ rowWidth = 20;
+ rowCount = 10000;
+ batchSize = rowWidth * rowCount;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+
+ // Truncate spill, merge batch row count
+
+ assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
+ assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
+
+ // But leave batch sizes at their defaults
+
+ assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+ assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+ }
+
+ @Test
+ public void testLowMemory() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 10 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Tight squeeze, but can be made to work.
+ // Input batches are a quarter of memory.
+
+ int rowWidth = 1000;
+ int rowCount = (int) (memoryLimit / 4 / rowWidth);
+ int batchSize = rowCount * rowWidth;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+ assertFalse(memManager.mayOverflow());
+
+ // Spill, merge batches should be constrained
+
+ int spillBatchSize = memManager.getSpillBatchSize();
+ assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
+ assertTrue(spillBatchSize >= rowWidth);
+ assertTrue(spillBatchSize <= memoryLimit / 3);
+ assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+ assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
+
+ int mergeBatchSize = memManager.getMergeBatchSize();
+ assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
+ assertTrue(mergeBatchSize >= rowWidth);
+ assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+ assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+
+ // Should spill after just two batches
+
+ assertFalse(memManager.isSpillNeeded(0, batchSize));
+ assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
+ assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+
+ // Tighter squeeze, but can be made to work.
+ // Input batches are 3/8 of memory; two fill 3/4,
+ // but small spill and merge batches allow progress.
+
+ rowWidth = 1000;
+ rowCount = (int) (memoryLimit * 3 / 8 / rowWidth);
+ batchSize = rowCount * rowWidth;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+ assertFalse(memManager.mayOverflow());
+
+ // Spill, merge batches should be constrained
+
+ spillBatchSize = memManager.getSpillBatchSize();
+ assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
+ assertTrue(spillBatchSize >= rowWidth);
+ assertTrue(spillBatchSize <= memoryLimit / 3);
+ assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+ assertTrue(memManager.getSpillBatchRowCount() > 1);
+ assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
+
+ mergeBatchSize = memManager.getMergeBatchSize();
+ assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
+ assertTrue(mergeBatchSize >= rowWidth);
+ assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+ assertTrue(memManager.getMergeBatchRowCount() > 1);
+ assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+ }
+
+ @Test
+ public void testExtremeLowMemory() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 10 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Jumbo row size, works with one row per batch. Minimum is to have two
+ // input rows and a spill row, or two spill rows and a merge row.
+ // Have to back off the exact size a bit to allow for internal fragmentation
+ // in the merge and output batches.
+
+ int rowWidth = (int) (memoryLimit / 3 * 0.75);
+ int rowCount = 1;
+ int batchSize = rowWidth;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertEquals(rowWidth, memManager.getRowWidth());
+ assertEquals(batchSize, memManager.getInputBatchSize());
+ assertFalse(memManager.mayOverflow());
+
+ int spillBatchSize = memManager.getSpillBatchSize();
+ assertTrue(spillBatchSize >= rowWidth);
+ assertTrue(spillBatchSize <= memoryLimit / 3);
+ assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+ assertEquals(1, memManager.getSpillBatchRowCount());
+
+ int mergeBatchSize = memManager.getMergeBatchSize();
+ assertTrue(mergeBatchSize >= rowWidth);
+ assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+ assertEquals(1, memManager.getMergeBatchRowCount());
+
+ // Should spill after just two rows
+
+ assertFalse(memManager.isSpillNeeded(0, batchSize));
+ assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
+ assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+
+ // In trouble now, can't fit even three rows.
+
+ rowWidth = (int) (memoryLimit / 2);
+ rowCount = 1;
+ batchSize = rowWidth;
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertTrue(memManager.mayOverflow());
+ }
+
+ @Test
+ public void testConfigConstraints() {
+ int memConstaint = 40 * ONE_MEG;
+ int batchSizeConstaint = ONE_MEG / 2;
+ int mergeSizeConstaint = ONE_MEG;
+ DrillConfig drillConfig = new ConfigBuilder()
+ .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstaint)
+ .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstaint)
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, mergeSizeConstaint)
+ .build();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 50 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ assertEquals(batchSizeConstaint, memManager.getPreferredSpillBatchSize());
+ assertEquals(mergeSizeConstaint, memManager.getPreferredMergeBatchSize());
+ assertEquals(memConstaint, memManager.getMemoryLimit());
+
+ int rowWidth = 300;
+ int rowCount = 10000;
+ int batchSize = rowWidth * rowCount * 2;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ verifyCalcs(sortConfig, memConstaint, memManager, batchSize, rowWidth, rowCount);
+ }
+
+ @Test
+ public void testMemoryDynamics() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 50 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ int rowWidth = 300;
+ int rowCount = 10000;
+ int batchSize = rowWidth * rowCount * 2;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+
+ int spillBatchSize = memManager.getSpillBatchSize();
+
+ // Test various memory fill levels
+
+ assertFalse(memManager.isSpillNeeded(0, batchSize));
+ assertFalse(memManager.isSpillNeeded(2 * batchSize, batchSize));
+ assertTrue(memManager.isSpillNeeded(memoryLimit - spillBatchSize + 1, batchSize));
+
+ // Similar, but for an in-memory merge
+
+ assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG - 1));
+ assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG));
+ assertFalse(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG + 1));
+ }
+
+ @Test
+ public void testMergeCalcs() {
+
+ // No artificial merge limit
+
+ int mergeLimitConstraint = 100;
+ DrillConfig drillConfig = new ConfigBuilder()
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
+ .build();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ // Allow four spill batches, 8 MB each, plus one output of 16
+ long memoryLimit = 50 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Prime the estimates
+
+ int rowWidth = 300;
+ int rowCount = 10000;
+ int batchSize = rowWidth * rowCount * 2;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ int spillBatchSize = memManager.getSpillBatchSize();
+ int mergeBatchSize = memManager.getMergeBatchSize();
+
+ // One in-mem batch, no merging.
+
+ long allocMemory = memoryLimit - mergeBatchSize;
+ MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // Many in-mem batches, just enough to merge
+
+ allocMemory = memoryLimit - mergeBatchSize;
+ int memBatches = (int) (allocMemory / batchSize);
+ allocMemory = memBatches * batchSize;
+ task = memManager.consolidateBatches(allocMemory, memBatches, 0);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // Spills if no room for spill and in-memory batches
+
+ task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+ assertEquals(MergeAction.SPILL, task.action);
+
+ // One more in-mem batch: now needs to spill
+
+ memBatches++;
+ allocMemory = memBatches * batchSize;
+ task = memManager.consolidateBatches(allocMemory, memBatches, 0);
+ assertEquals(MergeAction.SPILL, task.action);
+
+ // No spill for various in-mem/spill run combinations
+
+ allocMemory = memoryLimit - spillBatchSize - mergeBatchSize;
+ memBatches = (int) (allocMemory / batchSize);
+ allocMemory = memBatches * batchSize;
+ task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+ assertEquals(MergeAction.NONE, task.action);
+
+ allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize;
+ memBatches = (int) (allocMemory / batchSize);
+ allocMemory = memBatches * batchSize;
+ task = memManager.consolidateBatches(allocMemory, memBatches, 2);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // No spill if no in-memory, only spill, and spill fits
+
+ long freeMem = memoryLimit - mergeBatchSize;
+ int spillBatches = (int) (freeMem / spillBatchSize);
+ task = memManager.consolidateBatches(0, 0, spillBatches);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // One more and must merge
+
+ task = memManager.consolidateBatches(0, 0, spillBatches + 1);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(2, task.count);
+
+ // Two more and will merge more
+
+ task = memManager.consolidateBatches(0, 0, spillBatches + 2);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(3, task.count);
+ }
+
+ @Test
+ public void testMergeLimit() {
+ // Constrain merge width
+ int mergeLimitConstraint = 5;
+ DrillConfig drillConfig = new ConfigBuilder()
+ .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
+ .build();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ // Plenty of memory, memory will not be a limit
+ long memoryLimit = 400 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Prime the estimates
+
+ int rowWidth = 300;
+ int rowCount = 10000;
+ int batchSize = rowWidth * rowCount * 2;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+
+ // Pretend merge limit runs, additional in-memory batches
+
+ int memBatchCount = 10;
+ int spillRunCount = mergeLimitConstraint;
+ long allocMemory = batchSize * memBatchCount;
+ MergeTask task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // One more run than can merge in one go. But, we have plenty of
+ // memory to merge and hold the in-memory batches. So, just merge.
+
+ task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount + 1);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(2, task.count);
+
+ // One more runs than can merge in one go, intermediate merge
+
+ task = memManager.consolidateBatches(0, 0, spillRunCount + 1);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(2, task.count);
+
+ // Two more spill runs, merge three
+
+ task = memManager.consolidateBatches(0, 0, spillRunCount + 2);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(3, task.count);
+
+ // Way more than can merge, limit to the constraint
+
+ task = memManager.consolidateBatches(0, 0, spillRunCount * 3);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertEquals(mergeLimitConstraint, task.count);
+ }
+
+ @Test
+ public void testMetrics() {
+ OperatorFixture.MockStats stats = new OperatorFixture.MockStats();
+ SortMetrics metrics = new SortMetrics(stats);
+
+ // Input stats
+
+ metrics.updateInputMetrics(100, 10_000);
+ assertEquals(1, metrics.getInputBatchCount());
+ assertEquals(100, metrics.getInputRowCount());
+ assertEquals(10_000, metrics.getInputBytes());
+
+ metrics.updateInputMetrics(200, 20_000);
+ assertEquals(2, metrics.getInputBatchCount());
+ assertEquals(300, metrics.getInputRowCount());
+ assertEquals(30_000, metrics.getInputBytes());
+
+ // Buffer memory
+
+ assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+ metrics.updateMemory(1_000_000);
+ assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+ metrics.updateMemory(2_000_000);
+ assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+ metrics.updateMemory(100_000);
+ assertEquals(100_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+ // Peak batches
+
+ assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+ metrics.updatePeakBatches(10);
+ assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+ metrics.updatePeakBatches(1);
+ assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+ metrics.updatePeakBatches(20);
+ assertEquals(20D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+ // Merge count
+
+ assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+ metrics.incrMergeCount();
+ assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+ metrics.incrMergeCount();
+ assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+ // Spill count
+
+ assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+ metrics.incrSpillCount();
+ assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+ metrics.incrSpillCount();
+ assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+ // Write bytes
+
+ assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.01);
+
+ metrics.updateWriteBytes(17 * ONE_MEG + ONE_MEG * 3 / 4);
+ assertEquals(17.75D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.001);
+ }
+}