You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:38 UTC
[11/15] drill git commit: DRILL-5657: Size-aware vector writer
structure
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
new file mode 100644
index 0000000..b23eb0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
@@ -0,0 +1,810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test (non-array) map support in the result set loader and related classes.
+ */
+
+public class TestResultSetLoaderMaps extends SubOperatorTest {
+
+ @Test
+ public void testBasics() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("c", MinorType.INT)
+ .add("d", MinorType.VARCHAR)
+ .buildMap()
+ .add("e", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Verify structure and schema
+
+ assertEquals(5, rsLoader.schemaVersion());
+ TupleMetadata actualSchema = rootWriter.schema();
+ assertEquals(3, actualSchema.size());
+ assertTrue(actualSchema.metadata(1).isMap());
+ assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+ assertEquals(2, actualSchema.column("m").getChildren().size());
+
+ rsLoader.startBatch();
+
+ // Write a row the way that clients will do.
+
+ ScalarWriter aWriter = rootWriter.scalar("a");
+ TupleWriter mWriter = rootWriter.tuple("m");
+ ScalarWriter cWriter = mWriter.scalar("c");
+ ScalarWriter dWriter = mWriter.scalar("d");
+ ScalarWriter eWriter = rootWriter.scalar("e");
+
+ rootWriter.start();
+ aWriter.setInt(10);
+ cWriter.setInt(110);
+ dWriter.setString("fred");
+ eWriter.setString("pebbles");
+ rootWriter.save();
+
+ // Try adding a duplicate column.
+
+ try {
+ mWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL));
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+
+ // Write another using the test-time conveniences
+
+ rootWriter.addRow(20, new Object[] {210, "barney"}, "bam-bam");
+
+ // Harvest the batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(5, rsLoader.schemaVersion());
+ assertEquals(2, actual.rowCount());
+
+ // Validate data
+
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, new Object[] {110, "fred"}, "pebbles")
+ .addRow(20, new Object[] {210, "barney"}, "bam-bam")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+ rsLoader.close();
+ }
+
+ /**
+ * Create schema with a map, then add columns to the map
+ * after delivering the first batch. The new columns should appear
+ * in the second-batch output.
+ */
+
+ @Test
+ public void testMapEvolution() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("b", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(3, rsLoader.schemaVersion());
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, new Object[] {"fred"})
+ .addRow(20, new Object[] {"barney"});
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(3, rsLoader.schemaVersion());
+ assertEquals(2, actual.rowCount());
+
+ // Validate first batch
+
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, new Object[] {"fred"})
+ .addRow(20, new Object[] {"barney"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ // Add three columns in the second batch. One before
+ // the batch starts, one before the first row, and one after
+ // the first row.
+
+ TupleWriter mapWriter = rootWriter.tuple("m");
+ mapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED));
+
+ rsLoader.startBatch();
+ mapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.BIGINT, DataMode.REQUIRED));
+
+ rootWriter.addRow(30, new Object[] {"wilma", 130, 130_000L});
+
+ mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
+ rootWriter.addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"});
+
+ actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(6, rsLoader.schemaVersion());
+ assertEquals(2, actual.rowCount());
+
+ // Validate first batch
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.INT)
+ .add("d", MinorType.BIGINT)
+ .add("e", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(30, new Object[] {"wilma", 130, 130_000L, ""})
+ .addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test adding a map to a loader after writing the first row.
+ */
+
+ @Test
+ public void testMapAddition() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(1, rsLoader.schemaVersion());
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Start without the map. Add a map after the first row.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10);
+
+ int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
+ TupleWriter mapWriter = rootWriter.tuple(mapIndex);
+
+ // Add a column to the map with the same name as the top-level column.
+ // Verifies that the name spaces are independent.
+
+ mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter
+ .addRow(20, new Object[]{"fred"})
+ .addRow(30, new Object[]{"barney"});
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(3, rsLoader.schemaVersion());
+ assertEquals(3, actual.rowCount());
+
+ // Validate first batch
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("a", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, new Object[] {""})
+ .addRow(20, new Object[] {"fred"})
+ .addRow(30, new Object[] {"barney"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test adding an empty map to a loader after writing the first row.
+ * Then add columns in another batch. Yes, this is a bizarre condition,
+ * but we must check it anyway for robustness.
+ */
+
+ @Test
+ public void testEmptyMapAddition() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(1, rsLoader.schemaVersion());
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Start without the map. Add a map after the first row.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10);
+
+ int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
+ TupleWriter mapWriter = rootWriter.tuple(mapIndex);
+
+ rootWriter
+ .addRow(20, new Object[]{})
+ .addRow(30, new Object[]{});
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(2, rsLoader.schemaVersion());
+ assertEquals(3, actual.rowCount());
+
+ // Validate first batch
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .buildMap()
+ .buildSchema();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, new Object[] {})
+ .addRow(20, new Object[] {})
+ .addRow(30, new Object[] {})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ // Now add another column to the map
+
+ rsLoader.startBatch();
+ mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter
+ .addRow(40, new Object[]{"fred"})
+ .addRow(50, new Object[]{"barney"});
+
+ actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(3, rsLoader.schemaVersion());
+ assertEquals(2, actual.rowCount());
+
+ // Validate first batch
+
+ expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("a", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(40, new Object[] {"fred"})
+ .addRow(50, new Object[] {"barney"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Create nested maps. Then, add columns to each map
+ * on the fly. Use required, variable-width columns since
+ * those require the most processing and are most likely to
+ * fail if anything is out of place.
+ */
+
+ @Test
+ public void testNestedMapsRequired() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m1")
+ .add("b", MinorType.VARCHAR)
+ .addMap("m2")
+ .add("c", MinorType.VARCHAR)
+ .buildMap()
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(5, rsLoader.schemaVersion());
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}});
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(5, rsLoader.schemaVersion());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, new Object[] {"b1", new Object[] {"c1"}})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ // Now add columns in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}});
+
+ TupleWriter m1Writer = rootWriter.tuple("m1");
+ m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
+ TupleWriter m2Writer = m1Writer.tuple("m2");
+ m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"});
+
+ // And another set while the write proceeds.
+
+ m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REQUIRED));
+ m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"});
+
+ // Validate second batch
+
+ actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(9, rsLoader.schemaVersion());
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m1")
+ .add("b", MinorType.VARCHAR)
+ .addMap("m2")
+ .add("c", MinorType.VARCHAR)
+ .add("e", MinorType.VARCHAR)
+ .add("g", MinorType.VARCHAR)
+ .buildMap()
+ .add("d", MinorType.VARCHAR)
+ .add("f", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(20, new Object[] {"b2", new Object[] {"c2", "", "" }, "", "" })
+ .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", "" }, "d3", "" })
+ .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Create nested maps. Then, add columns to each map
+ * on the fly. This time, with nullable types.
+ */
+
+ @Test
+ public void testNestedMapsNullable() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m1")
+ .addNullable("b", MinorType.VARCHAR)
+ .addMap("m2")
+ .addNullable("c", MinorType.VARCHAR)
+ .buildMap()
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}});
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, new Object[] {"b1", new Object[] {"c1"}})
+ .build();
+// actual.print();
+// expected.print();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ // Now add columns in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}});
+
+ TupleWriter m1Writer = rootWriter.tuple("m1");
+ m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
+ TupleWriter m2Writer = m1Writer.tuple("m2");
+ m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+ rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"});
+
+ // And another set while the write proceeds.
+
+ m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.OPTIONAL));
+ m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+ rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"});
+
+ // Validate second batch
+
+ actual = fixture.wrap(rsLoader.harvest());
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m1")
+ .addNullable("b", MinorType.VARCHAR)
+ .addMap("m2")
+ .addNullable("c", MinorType.VARCHAR)
+ .addNullable("e", MinorType.VARCHAR)
+ .addNullable("g", MinorType.VARCHAR)
+ .buildMap()
+ .addNullable("d", MinorType.VARCHAR)
+ .addNullable("f", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(20, new Object[] {"b2", new Object[] {"c2", null, null}, null, null})
+ .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", null}, "d3", null})
+ .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test a map that contains a scalar array. No reason to suspect that this
+ * will have problem as the array writer is fully tested in the accessor
+ * subsystem. Still, need to test the cardinality methods of the loader
+ * layer.
+ */
+
+ @Test
+ public void testMapWithArray() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .addArray("c", MinorType.INT)
+ .addArray("d", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write some rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, new Object[] {new int[] {110, 120, 130},
+ new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}})
+ .addRow(20, new Object[] {new int[] {210}, new String[] {}})
+ .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}})
+ ;
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, new Object[] {new int[] {110, 120, 130},
+ new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}})
+ .addRow(20, new Object[] {new int[] {210}, new String[] {}})
+ .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}})
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ // Add another array after the first row in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}})
+ .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}})
+ ;
+
+ TupleWriter mapWriter = rootWriter.tuple("m");
+ mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED));
+ rootWriter
+ .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}})
+ .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}})
+ ;
+
+ // Validate first batch. The new array should have been back-filled with
+ // empty offsets for the missing rows.
+
+ actual = fixture.wrap(rsLoader.harvest());
+// System.out.println(actual.schema().toString());
+ expected = fixture.rowSetBuilder(actual.schema())
+ .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}, new String[] {}})
+ .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}, new String[] {}})
+ .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}})
+ .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}})
+ .build();
+// expected.print();
+
+ new RowSetComparison(expected).verifyAndClearAll(actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Create a schema with a map, then trigger an overflow on one of the columns
+ * in the map. Proper overflow handling should occur regardless of nesting
+ * depth.
+ */
+
+ @Test
+ public void testMapWithOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m1")
+ .add("b", MinorType.INT)
+ .addMap("m2")
+ .add("c", MinorType.INT) // Before overflow, written
+ .add("d", MinorType.VARCHAR)
+ .add("e", MinorType.INT) // After overflow, not yet written
+ .buildMap()
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ rsLoader.startBatch();
+ while (! rootWriter.isFull()) {
+ rootWriter.addRow(count, new Object[] {count * 10, new Object[] {count * 100, value, count * 1000}});
+ count++;
+ }
+
+ // Our row count should include the overflow row
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test the case in which a new column is added during the overflow row. Unlike
+ * the top-level schema case, internally we must create a copy of the map, and
+ * move vectors across only when the result is to include the schema version
+ * of the target column. For overflow, the new column is added after the
+ * first batch; it is added in the second batch that contains the overflow
+ * row in which the column was added.
+ */
+
+ @Test
+ public void testMapOverflowWithNewColumn() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(4, rsLoader.schemaVersion());
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Can't use the shortcut to populate rows when doing a schema
+ // change.
+
+ ScalarWriter aWriter = rootWriter.scalar("a");
+ TupleWriter mWriter = rootWriter.tuple("m");
+ ScalarWriter bWriter = mWriter.scalar("b");
+ ScalarWriter cWriter = mWriter.scalar("c");
+
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ rsLoader.startBatch();
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ aWriter.setInt(count);
+ bWriter.setInt(count * 10);
+ cWriter.setBytes(value, value.length);
+ if (rootWriter.isFull()) {
+
+ // Overflow just occurred. Add another column.
+
+ mWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.OPTIONAL));
+ mWriter.scalar("d").setInt(count * 100);
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ // Result set should include the original columns, but not d.
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+
+ assertEquals(4, rsLoader.schemaVersion());
+ assertTrue(schema.isEquivalent(result.schema()));
+ BatchSchema expectedSchema = new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList());
+ assertTrue(expectedSchema.isEquivalent(result.batchSchema()));
+
+ // Use a reader to validate row-by-row. Too large to create an expected
+ // result set.
+
+ RowSetReader reader = result.reader();
+ TupleReader mapReader = reader.tuple("m");
+ int rowId = 0;
+ while (reader.next()) {
+ assertEquals(rowId, reader.scalar("a").getInt());
+ assertEquals(rowId * 10, mapReader.scalar("b").getInt());
+ assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
+ rowId++;
+ }
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+
+ reader = result.reader();
+ mapReader = reader.tuple("m");
+ while (reader.next()) {
+ assertEquals(rowId, reader.scalar("a").getInt());
+ assertEquals(rowId * 10, mapReader.scalar("b").getInt());
+ assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
+ assertEquals(rowId * 100, mapReader.scalar("d").getInt());
+ }
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test
+ * that uses nested columns.
+ */
+
+ @Test
+ public void testOverwriteRow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("m")
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .buildMap()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Can't use the shortcut to populate rows when doing overwrites.
+
+ ScalarWriter aWriter = rootWriter.scalar("a");
+ TupleWriter mWriter = rootWriter.tuple("m");
+ ScalarWriter bWriter = mWriter.scalar("b");
+ ScalarWriter cWriter = mWriter.scalar("c");
+
+ // Write 100,000 rows, overwriting 99% of them. This will cause vector
+ // overflow and data corruption if overwrite does not work; but will happily
+ // produce the correct result if everything works as it should.
+
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ rsLoader.startBatch();
+ while (count < 100_000) {
+ rootWriter.start();
+ count++;
+ aWriter.setInt(count);
+ bWriter.setInt(count * 10);
+ cWriter.setBytes(value, value.length);
+ if (count % 100 == 0) {
+ rootWriter.save();
+ }
+ }
+
+ // Verify using a reader.
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(count / 100, result.rowCount());
+ RowSetReader reader = result.reader();
+ TupleReader mReader = reader.tuple("m");
+ int rowId = 1;
+ while (reader.next()) {
+ assertEquals(rowId * 100, reader.scalar("a").getInt());
+ assertEquals(rowId * 1000, mReader.scalar("b").getInt());
+ assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes()));
+ rowId++;
+ }
+
+ result.clear();
+ rsLoader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
new file mode 100644
index 0000000..2c4c87b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+public class TestResultSetLoaderOmittedValues extends SubOperatorTest {
+
+ /**
+ * Test "holes" in the middle of a batch, and unset columns at
+ * the end. Ending the batch should fill in missing values.
+ */
+
+ @Test
+ public void testOmittedValuesAtEnd() {
+
+ // Create columns up front
+
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addNullable("c", MinorType.VARCHAR)
+ .add("d", MinorType.INT)
+ .addNullable("e", MinorType.INT)
+ .addArray("f", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ int rowCount = 0;
+ ScalarWriter arrayWriter;
+ for (int i = 0; i < 2; i++) { // Row 0, 1
+ rootWriter.start();
+ rowCount++;
+ rootWriter.scalar(0).setInt(rowCount);
+ rootWriter.scalar(1).setString("b_" + rowCount);
+ rootWriter.scalar(2).setString("c_" + rowCount);
+ rootWriter.scalar(3).setInt(rowCount * 10);
+ rootWriter.scalar(4).setInt(rowCount * 100);
+ arrayWriter = rootWriter.column(5).array().scalar();
+ arrayWriter.setString("f_" + rowCount + "-1");
+ arrayWriter.setString("f_" + rowCount + "-2");
+ rootWriter.save();
+ }
+
+ // Holes in half the columns
+
+ for (int i = 0; i < 2; i++) { // Rows 2, 3
+ rootWriter.start();
+ rowCount++;
+ rootWriter.scalar(0).setInt(rowCount);
+ rootWriter.scalar(1).setString("b_" + rowCount);
+ rootWriter.scalar(3).setInt(rowCount * 10);
+ arrayWriter = rootWriter.column(5).array().scalar();
+ arrayWriter.setString("f_" + rowCount + "-1");
+ arrayWriter.setString("f_" + rowCount + "-2");
+ rootWriter.save();
+ }
+
+ // Holes in the other half
+
+ for (int i = 0; i < 2; i++) { // Rows 4, 5
+ rootWriter.start();
+ rowCount++;
+ rootWriter.scalar(0).setInt(rowCount);
+ rootWriter.scalar(2).setString("c_" + rowCount);
+ rootWriter.scalar(4).setInt(rowCount * 100);
+ rootWriter.save();
+ }
+
+ // All columns again.
+
+ for (int i = 0; i < 2; i++) { // Rows 6, 7
+ rootWriter.start();
+ rowCount++;
+ rootWriter.scalar(0).setInt(rowCount);
+ rootWriter.scalar(1).setString("b_" + rowCount);
+ rootWriter.scalar(2).setString("c_" + rowCount);
+ rootWriter.scalar(3).setInt(rowCount * 10);
+ rootWriter.scalar(4).setInt(rowCount * 100);
+ arrayWriter = rootWriter.column(5).array().scalar();
+ arrayWriter.setString("f_" + rowCount + "-1");
+ arrayWriter.setString("f_" + rowCount + "-2");
+ rootWriter.save();
+ }
+
+ // Omit all but the key column at end
+
+ for (int i = 0; i < 2; i++) { // Rows 8, 9
+ rootWriter.start();
+ rowCount++;
+ rootWriter.scalar(0).setInt(rowCount);
+ rootWriter.save();
+ }
+
+ // Harvest the row and verify.
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+// actual.print();
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addNullable("c", MinorType.VARCHAR)
+ .add("3", MinorType.INT)
+ .addNullable("e", MinorType.INT)
+ .addArray("f", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow( 1, "b_1", "c_1", 10, 100, new String[] {"f_1-1", "f_1-2"})
+ .addRow( 2, "b_2", "c_2", 20, 200, new String[] {"f_2-1", "f_2-2"})
+ .addRow( 3, "b_3", null, 30, null, new String[] {"f_3-1", "f_3-2"})
+ .addRow( 4, "b_4", null, 40, null, new String[] {"f_4-1", "f_4-2"})
+ .addRow( 5, "", "c_5", 0, 500, new String[] {})
+ .addRow( 6, "", "c_6", 0, 600, new String[] {})
+ .addRow( 7, "b_7", "c_7", 70, 700, new String[] {"f_7-1", "f_7-2"})
+ .addRow( 8, "b_8", "c_8", 80, 800, new String[] {"f_8-1", "f_8-2"})
+ .addRow( 9, "", null, 0, null, new String[] {})
+ .addRow( 10, "", null, 0, null, new String[] {})
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ rsLoader.close();
+ }
+
+ /**
+ * Test "holes" at the end of a batch when batch overflows. Completed
+ * batch must be finalized correctly, new batch initialized correct,
+ * for the missing values.
+ */
+
+ @Test
+ public void testOmittedValuesAtEndWithOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ // Row index
+ .add("a", MinorType.INT)
+ // Column that forces overflow
+ .add("b", MinorType.VARCHAR)
+ // Column with all holes
+ .addNullable("c", MinorType.VARCHAR)
+ // Column with some holes
+ .addNullable("d", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Fill the batch. Column d has some values. Column c is worst case: no values.
+
+ rsLoader.startBatch();
+ byte value[] = new byte[533];
+ Arrays.fill(value, (byte) 'X');
+ int rowNumber = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rowNumber++;
+ rootWriter.scalar(0).setInt(rowNumber);
+ rootWriter.scalar(1).setBytes(value, value.length);
+ if (rowNumber < 10_000) {
+ rootWriter.scalar(3).setString("d-" + rowNumber);
+ }
+ rootWriter.save();
+ assertEquals(rowNumber, rsLoader.totalRowCount());
+ }
+
+ // Harvest and verify
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(rowNumber - 1, result.rowCount());
+ RowSetReader reader = result.reader();
+ int rowIndex = 0;
+ while (reader.next()) {
+ int expectedRowNumber = 1 + rowIndex;
+ assertEquals(expectedRowNumber, reader.scalar(0).getInt());
+ assertTrue(reader.scalar(2).isNull());
+ if (expectedRowNumber < 10_000) {
+ assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString());
+ } else {
+ assertTrue(reader.scalar(3).isNull());
+ }
+ rowIndex++;
+ }
+
+ // Start count for this batch is one less than current
+ // count, because of the overflow row.
+
+ int startRowNumber = rowNumber;
+
+ // Write a few more rows to the next batch
+
+ rsLoader.startBatch();
+ for (int i = 0; i < 10; i++) {
+ rootWriter.start();
+ rowNumber++;
+ rootWriter.scalar(0).setInt(rowNumber);
+ rootWriter.scalar(1).setBytes(value, value.length);
+ if (i > 5) {
+ rootWriter.scalar(3).setString("d-" + rowNumber);
+ }
+ rootWriter.save();
+ assertEquals(rowNumber, rsLoader.totalRowCount());
+ }
+
+ // Verify that holes were preserved.
+
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(rowNumber, rsLoader.totalRowCount());
+ assertEquals(rowNumber - startRowNumber + 1, result.rowCount());
+// result.print();
+ reader = result.reader();
+ rowIndex = 0;
+ while (reader.next()) {
+ int expectedRowNumber = startRowNumber + rowIndex;
+ assertEquals(expectedRowNumber, reader.scalar(0).getInt());
+ assertTrue(reader.scalar(2).isNull());
+ if (rowIndex > 6) {
+ assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString());
+ } else {
+ assertTrue("Row " + rowIndex + " col d should be null", reader.scalar(3).isNull());
+ }
+ rowIndex++;
+ }
+ assertEquals(rowIndex, 11);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test that omitting the call to saveRow() effectively discards
+ * the row. Note that the vectors still contain values in the
+ * discarded position; just the various pointers are unset. If
+ * the batch ends before the discarded values are overwritten, the
+ * discarded values just exist at the end of the vector. Since vectors
+ * start with garbage contents, the discarded values are simply a different
+ * kind of garbage. But, if the client writes a new row, then the new
+ * row overwrites the discarded row. This works because we only change
+ * the tail part of a vector; never the internals.
+ */
+
+ @Test
+ public void testSkipRows() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ int rowNumber = 0;
+ for (int i = 0; i < 14; i++) {
+ rootWriter.start();
+ rowNumber++;
+ rootWriter.scalar(0).setInt(rowNumber);
+ if (i % 3 == 0) {
+ rootWriter.scalar(1).setNull();
+ } else {
+ rootWriter.scalar(1).setString("b-" + rowNumber);
+ }
+ if (i % 2 == 0) {
+ rootWriter.save();
+ }
+ }
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+// result.print();
+ SingleRowSet expected = fixture.rowSetBuilder(result.batchSchema())
+ .addRow( 1, null)
+ .addRow( 3, "b-3")
+ .addRow( 5, "b-5")
+ .addRow( 7, null)
+ .addRow( 9, "b-9")
+ .addRow(11, "b-11")
+ .addRow(13, null)
+ .build();
+// expected.print();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test that discarding a row works even if that row happens to be an
+ * overflow row.
+ */
+
+ @Test
+ public void testSkipOverflowRow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setInt(count);
+ rootWriter.scalar(1).setBytes(value, value.length);
+
+ // Relies on fact that isFull becomes true right after
+ // a vector overflows; don't have to wait for saveRow().
+ // Keep all rows, but discard the overflow row.
+
+ if (! rootWriter.isFull()) {
+ rootWriter.save();
+ }
+ count++;
+ }
+
+ // Discard the results.
+
+ rsLoader.harvest().zeroVectors();
+
+ // Harvest the next batch. Will be empty (because overflow row
+ // was discarded.)
+
+ rsLoader.startBatch();
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(0, result.rowCount());
+ result.clear();
+
+ rsLoader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
new file mode 100644
index 0000000..0146cfe
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Exercise the vector overflow functionality for the result set loader.
+ */
+
+public class TestResultSetLoaderOverflow extends SubOperatorTest {
+
+ /**
+ * Test that the writer detects a vector overflow. The offending column
+ * value should be moved to the next batch.
+ */
+
+ @Test
+ public void testVectorSizeLimit() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("s", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setBytes(value, value.length);
+ rootWriter.save();
+ count++;
+ }
+
+ // Number of rows should be driven by vector size.
+ // Our row count should include the overflow row
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test that the writer detects a vector overflow. The offending column
+ * value should be moved to the next batch.
+ */
+
+ @Test
+ public void testBatchSizeLimit() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("s", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .setBatchSizeLimit(
+ 8 * 1024 * 1024 + // Data
+ 2 * ValueVector.MAX_ROW_COUNT * 4) // Offsets, doubled because of +1
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setBytes(value, value.length);
+ rootWriter.save();
+ count++;
+ }
+
+ // Our row count should include the overflow row
+
+ int expectedCount = 8 * 1024 * 1024 / value.length;
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Load a batch to overflow. Then, close the loader with the overflow
+ * batch unharvested. The Loader should release the memory allocated
+ * to the unused overflow vectors.
+ */
+
+ @Test
+ public void testCloseWithOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("s", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setBytes(value, value.length);
+ rootWriter.save();
+ count++;
+ }
+
+ assertTrue(count < ValueVector.MAX_ROW_COUNT);
+
+ // Harvest the full batch
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ result.clear();
+
+ // Close without harvesting the overflow batch.
+
+ rsLoader.close();
+ }
+
+ /**
+ * Case where a single array fills up the vector to the maximum size
+ * limit. Overflow won't work here; the attempt will fail with a user
+ * exception.
+ */
+
+ @Test
+ public void testOversizeArray() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addArray("s", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Create a single array as the column value in the first row. When
+ // this overflows, an exception is thrown since overflow is not possible.
+
+ rsLoader.startBatch();
+ byte value[] = new byte[473];
+ Arrays.fill(value, (byte) 'X');
+ rootWriter.start();
+ ScalarWriter array = rootWriter.array(0).scalar();
+ try {
+ for (int i = 0; i < ValueVector.MAX_ROW_COUNT; i++) {
+ array.setBytes(value, value.length);
+ }
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("column value is larger than the maximum"));
+ }
+ rsLoader.close();
+ }
+
+ /**
+ * Test a row with a single array column which overflows. Verifies
+ * that all the fiddly bits about offset vectors and so on works
+ * correctly. Run this test (the simplest case) if you change anything
+ * about the array handling code.
+ */
+
+ @Test
+ public void testSizeLimitOnArray() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addArray("s", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Fill batch with rows of with a single array, three values each. Tack on
+ // a suffix to each so we can be sure the proper data is written and moved
+ // to the overflow batch.
+
+ rsLoader.startBatch();
+ byte value[] = new byte[473];
+ Arrays.fill(value, (byte) 'X');
+ String strValue = new String(value, Charsets.UTF_8);
+ int count = 0;
+ int rowSize = 0;
+ int totalSize = 0;
+ int valuesPerArray = 13;
+ while (rootWriter.start()) {
+ totalSize += rowSize;
+ rowSize = 0;
+ ScalarWriter array = rootWriter.array(0).scalar();
+ for (int i = 0; i < valuesPerArray; i++) {
+ String cellValue = strValue + (count + 1) + "." + i;
+ array.setString(cellValue);
+ rowSize += cellValue.length();
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ // Row count should include the overflow row.
+
+ int expectedCount = count - 1;
+
+ // Size without overflow row should fit in the vector, size
+ // with overflow should not.
+
+ assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
+ assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
+
+ // Result should exclude the overflow row. Last row
+ // should hold the last full array.
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(expectedCount, result.rowCount());
+ RowSetReader reader = result.reader();
+ reader.set(expectedCount - 1);
+ ScalarElementReader arrayReader = reader.column(0).elements();
+ assertEquals(valuesPerArray, arrayReader.size());
+ for (int i = 0; i < valuesPerArray; i++) {
+ String cellValue = strValue + (count - 1) + "." + i;
+ assertEquals(cellValue, arrayReader.getString(i));
+ }
+ result.clear();
+
+ // Next batch should start with the overflow row.
+ // The only row in this next batch should be the whole
+ // array being written at the time of overflow.
+
+ rsLoader.startBatch();
+// VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
+// ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter());
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ result = fixture.wrap(rsLoader.harvest());
+// VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
+ assertEquals(1, result.rowCount());
+ reader = result.reader();
+ reader.next();
+ arrayReader = reader.column(0).elements();
+ assertEquals(valuesPerArray, arrayReader.size());
+ for (int i = 0; i < valuesPerArray; i++) {
+ String cellValue = strValue + (count) + "." + i;
+ assertEquals(cellValue, arrayReader.getString(i));
+ }
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test the complete set of array overflow cases:
+ * <ul>
+ * <li>Array a is written before the column that has overflow,
+ * and must be copied, in its entirety, to the overflow row.</li>
+ * <li>Column b causes the overflow.</li>
+ * <li>Column c is written after the overflow, and should go
+ * to the look-ahead row.</li>
+ * <li>Column d is written for a while, then has empties before
+ * the overflow row, but is written in the overflow row.<li>
+ * <li>Column e is like d, but is not written in the overflow
+ * row.</li>
+ */
+
+ @Test
+ public void testArrayOverflowWithOtherArrays() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addArray("a", MinorType.INT)
+ .addArray("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.INT)
+ .addArray("d", MinorType.INT)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Fill batch with rows of with a single array, three values each. Tack on
+ // a suffix to each so we can be sure the proper data is written and moved
+ // to the overflow batch.
+
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ String strValue = new String(value, Charsets.UTF_8);
+
+ int aCount = 3;
+ int bCount = 11;
+ int cCount = 5;
+ int dCount = 7;
+
+ int cCutoff = ValueVector.MAX_BUFFER_SIZE / value.length / bCount / 2;
+
+ ScalarWriter aWriter = rootWriter.array("a").scalar();
+ ScalarWriter bWriter = rootWriter.array("b").scalar();
+ ScalarWriter cWriter = rootWriter.array("c").scalar();
+ ScalarWriter dWriter = rootWriter.array("d").scalar();
+
+ int count = 0;
+ rsLoader.startBatch();
+ while (rootWriter.start()) {
+ if (rootWriter.rowCount() == 2952) {
+ count = count + 0;
+ }
+ for (int i = 0; i < aCount; i++) {
+ aWriter.setInt(count * aCount + i);
+ }
+ for (int i = 0; i < bCount; i++) {
+ String cellValue = strValue + (count * bCount + i);
+ bWriter.setString(cellValue);
+ }
+ if (count < cCutoff) {
+ for (int i = 0; i < cCount; i++) {
+ cWriter.setInt(count * cCount + i);
+ }
+ }
+
+ // Relies on fact that isFull becomes true right after
+ // a vector overflows; don't have to wait for saveRow().
+
+ if (count < cCutoff || rootWriter.isFull()) {
+ for (int i = 0; i < dCount; i++) {
+ dWriter.setInt(count * dCount + i);
+ }
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ // Verify
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(count - 1, result.rowCount());
+
+ RowSetReader reader = result.reader();
+ ScalarElementReader aReader = reader.array("a").elements();
+ ScalarElementReader bReader = reader.array("b").elements();
+ ScalarElementReader cReader = reader.array("c").elements();
+ ScalarElementReader dReader = reader.array("d").elements();
+
+ while (reader.next()) {
+ int rowId = reader.rowIndex();
+ assertEquals(aCount, aReader.size());
+ for (int i = 0; i < aCount; i++) {
+ assertEquals(rowId * aCount + i, aReader.getInt(i));
+ }
+ assertEquals(bCount, bReader.size());
+ for (int i = 0; i < bCount; i++) {
+ String cellValue = strValue + (rowId * bCount + i);
+ assertEquals(cellValue, bReader.getString(i));
+ }
+ if (rowId < cCutoff) {
+ assertEquals(cCount, cReader.size());
+ for (int i = 0; i < cCount; i++) {
+ assertEquals(rowId * cCount + i, cReader.getInt(i));
+ }
+ assertEquals(dCount, dReader.size());
+ for (int i = 0; i < dCount; i++) {
+ assertEquals(rowId * dCount + i, dReader.getInt(i));
+ }
+ } else {
+ assertEquals(0, cReader.size());
+ assertEquals(0, dReader.size());
+ }
+ }
+ result.clear();
+ int firstCount = count - 1;
+
+ // One row is in the batch. Write more, skipping over the
+ // initial few values for columns c and d. Column d has a
+ // roll-over value, c has an empty roll-over.
+
+ rsLoader.startBatch();
+ for (int j = 0; j < 5; j++) {
+ rootWriter.start();
+ for (int i = 0; i < aCount; i++) {
+ aWriter.setInt(count * aCount + i);
+ }
+ for (int i = 0; i < bCount; i++) {
+ String cellValue = strValue + (count * bCount + i);
+ bWriter.setString(cellValue);
+ }
+ if (j > 3) {
+ for (int i = 0; i < cCount; i++) {
+ cWriter.setInt(count * cCount + i);
+ }
+ for (int i = 0; i < dCount; i++) {
+ dWriter.setInt(count * dCount + i);
+ }
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(6, result.rowCount());
+
+ reader = result.reader();
+ aReader = reader.array("a").elements();
+ bReader = reader.array("b").elements();
+ cReader = reader.array("c").elements();
+ dReader = reader.array("d").elements();
+
+ int j = 0;
+ while (reader.next()) {
+ int rowId = firstCount + reader.rowIndex();
+ assertEquals(aCount, aReader.size());
+ for (int i = 0; i < aCount; i++) {
+ assertEquals("Index " + i, rowId * aCount + i, aReader.getInt(i));
+ }
+ assertEquals(bCount, bReader.size());
+ for (int i = 0; i < bCount; i++) {
+ String cellValue = strValue + (rowId * bCount + i);
+ assertEquals(cellValue, bReader.getString(i));
+ }
+ if (j > 4) {
+ assertEquals(cCount, cReader.size());
+ for (int i = 0; i < cCount; i++) {
+ assertEquals(rowId * cCount + i, cReader.getInt(i));
+ }
+ } else {
+ assertEquals(0, cReader.size());
+ }
+ if (j == 0 || j > 4) {
+ assertEquals(dCount, dReader.size());
+ for (int i = 0; i < dCount; i++) {
+ assertEquals(rowId * dCount + i, dReader.getInt(i));
+ }
+ } else {
+ assertEquals(0, dReader.size());
+ }
+ j++;
+ }
+ result.clear();
+
+ rsLoader.close();
+ }
+
+ /**
+ * Create an array that contains more than 64K values. Drill has no numeric
+ * limit on array lengths. (Well, it does, but the limit is about 2 billion
+ * which, even for bytes, is too large to fit into a vector...)
+ */
+
+ @Test
+ public void testLargeArray() {
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
+ RowSetLoader rootWriter = rsLoader.writer();
+ MaterializedField field = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REPEATED);
+ rootWriter.addColumn(field);
+
+ // Create a single array as the column value in the first row. When
+ // this overflows, an exception is thrown since overflow is not possible.
+
+ rsLoader.startBatch();
+ rootWriter.start();
+ ScalarWriter array = rootWriter.array(0).scalar();
+ try {
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ array.setInt(i+1);
+ }
+ fail();
+ } catch (UserException e) {
+ // Expected
+ }
+ rsLoader.close();
+ }
+
+ /**
+ * Test the case that an array has "missing values" before the overflow.
+ */
+
+ @Test
+ public void testMissingArrayValues() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.INT)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+
+ int blankAfter = ValueVector.MAX_BUFFER_SIZE / 512 * 2 / 3;
+ ScalarWriter cWriter = rootWriter.array("c").scalar();
+
+ rsLoader.startBatch();
+ int rowId = 0;
+ while (rootWriter.start()) {
+ rootWriter.scalar("a").setInt(rowId);
+ rootWriter.scalar("b").setBytes(value, value.length);
+ if (rowId < blankAfter) {
+ for (int i = 0; i < 3; i++) {
+ cWriter.setInt(rowId * 3 + i);
+ }
+ }
+ rootWriter.save();
+ rowId++;
+ }
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(rowId - 1, result.rowCount());
+ RowSetReader reader = result.reader();
+ ScalarElementReader cReader = reader.array("c").elements();
+ while (reader.next()) {
+ assertEquals(reader.rowIndex(), reader.scalar("a").getInt());
+ assertTrue(Arrays.equals(value, reader.scalar("b").getBytes()));
+ if (reader.rowIndex() < blankAfter) {
+ assertEquals(3, cReader.size());
+ for (int i = 0; i < 3; i++) {
+ assertEquals(reader.rowIndex() * 3 + i, cReader.getInt(i));
+ }
+ } else {
+ assertEquals(0, cReader.size());
+ }
+ }
+ result.clear();
+ rsLoader.close();
+ }
+
+ @Test
+ public void testOverflowWithNullables() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("n", MinorType.INT)
+ .addNullable("a", MinorType.VARCHAR)
+ .addNullable("b", MinorType.VARCHAR)
+ .addNullable("c", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte value[] = new byte[512];
+ Arrays.fill(value, (byte) 'X');
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setInt(count);
+ rootWriter.scalar(1).setNull();
+ rootWriter.scalar(2).setBytes(value, value.length);
+ rootWriter.scalar(3).setNull();
+ rootWriter.save();
+ count++;
+ }
+
+ // Result should exclude the overflow row
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(count - 1, result.rowCount());
+
+ RowSetReader reader = result.reader();
+ while (reader.next()) {
+ assertEquals(reader.rowIndex(), reader.scalar(0).getInt());
+ assertTrue(reader.scalar(1).isNull());
+ assertTrue(Arrays.equals(value, reader.scalar(2).getBytes()));
+ assertTrue(reader.scalar(3).isNull());
+ }
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ result = fixture.wrap(rsLoader.harvest());
+ reader = result.reader();
+ assertEquals(1, result.rowCount());
+ assertTrue(reader.next());
+ assertEquals(count - 1, reader.scalar(0).getInt());
+ assertTrue(reader.scalar(1).isNull());
+ assertTrue(Arrays.equals(value, reader.scalar(2).getBytes()));
+ assertTrue(reader.scalar(3).isNull());
+ result.clear();
+
+ rsLoader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
new file mode 100644
index 0000000..5c6ff7b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test of the basics of the projection mechanism.
+ */
+
+public class TestResultSetLoaderProjection extends SubOperatorTest {
+
+ @Test
+ public void testProjectionMap() {
+
+ // Null map means everything is projected
+
+ {
+ ProjectionSet projSet = ProjectionSetImpl.parse(null);
+ assertTrue(projSet instanceof NullProjectionSet);
+ assertTrue(projSet.isProjected("foo"));
+ }
+
+ // Empty list means everything is projected
+
+ {
+ ProjectionSet projSet = ProjectionSetImpl.parse(new ArrayList<SchemaPath>());
+ assertTrue(projSet instanceof NullProjectionSet);
+ assertTrue(projSet.isProjected("foo"));
+ }
+
+ // Simple non-map columns
+
+ {
+ List<SchemaPath> projCols = new ArrayList<>();
+ projCols.add(SchemaPath.getSimplePath("foo"));
+ projCols.add(SchemaPath.getSimplePath("bar"));
+ ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+ assertTrue(projSet instanceof ProjectionSetImpl);
+ assertTrue(projSet.isProjected("foo"));
+ assertTrue(projSet.isProjected("bar"));
+ assertFalse(projSet.isProjected("mumble"));
+ }
+
+ // Whole-map projection (note, fully projected maps are
+ // identical to projected simple columns at this level of
+ // abstraction.)
+
+ {
+ List<SchemaPath> projCols = new ArrayList<>();
+ projCols.add(SchemaPath.getSimplePath("map"));
+ ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+ assertTrue(projSet instanceof ProjectionSetImpl);
+ assertTrue(projSet.isProjected("map"));
+ assertFalse(projSet.isProjected("another"));
+ ProjectionSet mapProj = projSet.mapProjection("map");
+ assertNotNull(mapProj);
+ assertTrue(mapProj instanceof NullProjectionSet);
+ assertTrue(mapProj.isProjected("foo"));
+ assertNotNull(projSet.mapProjection("another"));
+ assertFalse(projSet.mapProjection("another").isProjected("anyCol"));
+ }
+
+ // Selected map projection, multiple levels, full projection
+ // at leaf level.
+
+ {
+ List<SchemaPath> projCols = new ArrayList<>();
+ projCols.add(SchemaPath.getCompoundPath("map", "a"));
+ projCols.add(SchemaPath.getCompoundPath("map", "b"));
+ projCols.add(SchemaPath.getCompoundPath("map", "map2", "x"));
+ ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+ assertTrue(projSet instanceof ProjectionSetImpl);
+ assertTrue(projSet.isProjected("map"));
+
+ // Map: an explicit map at top level
+
+ ProjectionSet mapProj = projSet.mapProjection("map");
+ assertTrue(mapProj instanceof ProjectionSetImpl);
+ assertTrue(mapProj.isProjected("a"));
+ assertTrue(mapProj.isProjected("b"));
+ assertTrue(mapProj.isProjected("map2"));
+ assertFalse(projSet.isProjected("bogus"));
+
+ // Map b: an implied nested map
+
+ ProjectionSet bMapProj = mapProj.mapProjection("b");
+ assertNotNull(bMapProj);
+ assertTrue(bMapProj instanceof NullProjectionSet);
+ assertTrue(bMapProj.isProjected("foo"));
+
+ // Map2, an nested map, has an explicit projection
+
+ ProjectionSet map2Proj = mapProj.mapProjection("map2");
+ assertNotNull(map2Proj);
+ assertTrue(map2Proj instanceof ProjectionSetImpl);
+ assertTrue(map2Proj.isProjected("x"));
+ assertFalse(map2Proj.isProjected("bogus"));
+ }
+ }
+
+ /**
+ * Test imposing a selection mask between the client and the underlying
+ * vector container.
+ */
+
+ @Test
+ public void testProjectionStatic() {
+ List<SchemaPath> selection = Lists.newArrayList(
+ SchemaPath.getSimplePath("c"),
+ SchemaPath.getSimplePath("b"),
+ SchemaPath.getSimplePath("e"));
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.INT)
+ .add("d", MinorType.INT)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(selection)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+ doProjectionTest(rsLoader);
+ }
+
+ @Test
+ public void testProjectionDynamic() {
+ List<SchemaPath> selection = Lists.newArrayList(
+ SchemaPath.getSimplePath("c"),
+ SchemaPath.getSimplePath("b"),
+ SchemaPath.getSimplePath("e"));
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(selection)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+ rootWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED));
+ rootWriter.addColumn(SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.REQUIRED));
+ rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED));
+ rootWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.REQUIRED));
+
+ doProjectionTest(rsLoader);
+ }
+
+ private void doProjectionTest(ResultSetLoader rsLoader) {
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // All columns appear, including non-projected ones.
+
+ TupleMetadata actualSchema = rootWriter.schema();
+ assertEquals(4, actualSchema.size());
+ assertEquals("a", actualSchema.column(0).getName());
+ assertEquals("b", actualSchema.column(1).getName());
+ assertEquals("c", actualSchema.column(2).getName());
+ assertEquals("d", actualSchema.column(3).getName());
+ assertEquals(0, actualSchema.index("A"));
+ assertEquals(3, actualSchema.index("d"));
+ assertEquals(-1, actualSchema.index("e"));
+
+ // Non-projected columns identify themselves via metadata
+
+ assertFalse(actualSchema.metadata("a").isProjected());
+ assertTrue(actualSchema.metadata("b").isProjected());
+ assertTrue(actualSchema.metadata("c").isProjected());
+ assertFalse(actualSchema.metadata("d").isProjected());
+
+ // Write some data. Doesn't need much.
+
+ rsLoader.startBatch();
+ for (int i = 1; i < 3; i++) {
+ rootWriter.start();
+ rootWriter.scalar(0).setInt(i * 5);
+ rootWriter.scalar(1).setInt(i);
+ rootWriter.scalar(2).setInt(i * 10);
+ rootWriter.scalar(3).setInt(i * 20);
+ rootWriter.save();
+ }
+
+ // Verify. Result should only have the projected
+ // columns, only if defined by the loader, in the order
+ // of definition.
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.INT)
+ .add("c", MinorType.INT)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(1, 10)
+ .addRow(2, 20)
+ .build();
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+// actual.print();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ rsLoader.close();
+ }
+
+ @Test
+ public void testMapProjection() {
+ List<SchemaPath> selection = Lists.newArrayList(
+ SchemaPath.getSimplePath("m1"),
+ SchemaPath.getCompoundPath("m2", "d"));
+ TupleMetadata schema = new SchemaBuilder()
+ .addMap("m1")
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .buildMap()
+ .addMap("m2")
+ .add("c", MinorType.INT)
+ .add("d", MinorType.INT)
+ .buildMap()
+ .addMap("m3")
+ .add("e", MinorType.INT)
+ .add("f", MinorType.INT)
+ .buildMap()
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(selection)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Verify the projected columns
+
+ TupleMetadata actualSchema = rootWriter.schema();
+ ColumnMetadata m1Md = actualSchema.metadata("m1");
+ assertTrue(m1Md.isMap());
+ assertTrue(m1Md.isProjected());
+ assertEquals(2, m1Md.mapSchema().size());
+ assertTrue(m1Md.mapSchema().metadata("a").isProjected());
+ assertTrue(m1Md.mapSchema().metadata("b").isProjected());
+
+ ColumnMetadata m2Md = actualSchema.metadata("m2");
+ assertTrue(m2Md.isMap());
+ assertTrue(m2Md.isProjected());
+ assertEquals(2, m2Md.mapSchema().size());
+ assertFalse(m2Md.mapSchema().metadata("c").isProjected());
+ assertTrue(m2Md.mapSchema().metadata("d").isProjected());
+
+ ColumnMetadata m3Md = actualSchema.metadata("m3");
+ assertTrue(m3Md.isMap());
+ assertFalse(m3Md.isProjected());
+ assertEquals(2, m3Md.mapSchema().size());
+ assertFalse(m3Md.mapSchema().metadata("e").isProjected());
+ assertFalse(m3Md.mapSchema().metadata("f").isProjected());
+
+ // Write a couple of rows.
+
+ rsLoader.startBatch();
+ rootWriter.start();
+ rootWriter.tuple("m1").scalar("a").setInt(1);
+ rootWriter.tuple("m1").scalar("b").setInt(2);
+ rootWriter.tuple("m2").scalar("c").setInt(3);
+ rootWriter.tuple("m2").scalar("d").setInt(4);
+ rootWriter.tuple("m3").scalar("e").setInt(5);
+ rootWriter.tuple("m3").scalar("f").setInt(6);
+ rootWriter.save();
+
+ rootWriter.start();
+ rootWriter.tuple("m1").scalar("a").setInt(11);
+ rootWriter.tuple("m1").scalar("b").setInt(12);
+ rootWriter.tuple("m2").scalar("c").setInt(13);
+ rootWriter.tuple("m2").scalar("d").setInt(14);
+ rootWriter.tuple("m3").scalar("e").setInt(15);
+ rootWriter.tuple("m3").scalar("f").setInt(16);
+ rootWriter.save();
+
+ // Verify. Only the projected columns appear in the result set.
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .addMap("m1")
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .buildMap()
+ .addMap("m2")
+ .add("d", MinorType.INT)
+ .buildMap()
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(new Object[] {1, 2}, new Object[] {4})
+ .addRow(new Object[] {11, 12}, new Object[] {14})
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+ rsLoader.close();
+ }
+
+ /**
+ * Test a map array. Use the convenience methods to set values.
+ * Only the projected array members should appear in the harvested
+ * results.
+ */
+
+ @Test
+ public void testMapArrayProjection() {
+ List<SchemaPath> selection = Lists.newArrayList(
+ SchemaPath.getSimplePath("m1"),
+ SchemaPath.getCompoundPath("m2", "d"));
+ TupleMetadata schema = new SchemaBuilder()
+ .addMapArray("m1")
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .buildMap()
+ .addMapArray("m2")
+ .add("c", MinorType.INT)
+ .add("d", MinorType.INT)
+ .buildMap()
+ .addMapArray("m3")
+ .add("e", MinorType.INT)
+ .add("f", MinorType.INT)
+ .buildMap()
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(selection)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write a couple of rows.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(
+ new Object[] { new Object[] {10, 20}, new Object[] {11, 21}},
+ new Object[] { new Object[] {30, 40}, new Object[] {31, 42}},
+ new Object[] { new Object[] {50, 60}, new Object[] {51, 62}});
+ rootWriter.addRow(
+ new Object[] { new Object[] {110, 120}, new Object[] {111, 121}},
+ new Object[] { new Object[] {130, 140}, new Object[] {131, 142}},
+ new Object[] { new Object[] {150, 160}, new Object[] {151, 162}});
+
+ // Verify. Only the projected columns appear in the result set.
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .addMapArray("m1")
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .buildMap()
+ .addMapArray("m2")
+ .add("d", MinorType.INT)
+ .buildMap()
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(
+ new Object[] { new Object[] {10, 20}, new Object[] {11, 21}},
+ new Object[] { new Object[] {40}, new Object[] {42}})
+ .addRow(
+ new Object[] { new Object[] {110, 120}, new Object[] {111, 121}},
+ new Object[] { new Object[] {140}, new Object[] {142}})
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+ rsLoader.close();
+ }
+
+ /**
+ * Verify that the projection code plays nice with vector overflow. Overflow
+ * is the most complex operation in this subsystem with many specialized
+ * methods that must work together flawlessly. This test ensures that
+ * non-projected columns stay in the background and don't interfere
+ * with overflow logic.
+ */
+
+ @Test
+ public void testProjectWithOverflow() {
+ List<SchemaPath> selection = Lists.newArrayList(
+ SchemaPath.getSimplePath("small"),
+ SchemaPath.getSimplePath("dummy"));
+ TupleMetadata schema = new SchemaBuilder()
+ .add("big", MinorType.VARCHAR)
+ .add("small", MinorType.VARCHAR)
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setProjection(selection)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ byte big[] = new byte[600];
+ Arrays.fill(big, (byte) 'X');
+ byte small[] = new byte[512];
+ Arrays.fill(small, (byte) 'X');
+
+ rsLoader.startBatch();
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ rootWriter.scalar(0).setBytes(big, big.length);
+ rootWriter.scalar(1).setBytes(small, small.length);
+ rootWriter.save();
+ count++;
+ }
+
+ // Number of rows should be driven by size of the
+ // projected vector ("small"), not by the larger, unprojected
+ // "big" vector.
+ // Our row count should include the overflow row
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / small.length;
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+
+ // Next batch should start with the overflow row
+
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+ result.clear();
+
+ rsLoader.close();
+ }
+}