You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/19 10:50:43 UTC
[3/3] drill git commit: DRILL-5514: Enhance VectorContainer to merge
two row sets
DRILL-5514: Enhance VectorContainer to merge two row sets
Adds ability to merge two schemas and to merge two vector containers,
in each case producing a new, merged result. See DRILL-5514 for details.
Also provides a handy constructor to create a vector container given a
pre-defined schema.
closes #837
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be43a9ed
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be43a9ed
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be43a9ed
Branch: refs/heads/master
Commit: be43a9edd148ef3af6f92c5ce7cda235c5ac1ad6
Parents: b714b2d
Author: Paul Rogers <pr...@maprtech.com>
Authored: Mon May 15 15:59:35 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:22:10 2017 +0300
----------------------------------------------------------------------
.../apache/drill/exec/record/BatchSchema.java | 28 +++++
.../drill/exec/record/VectorContainer.java | 54 +++++++-
.../drill/exec/record/TestVectorContainer.java | 126 +++++++++++++++++++
.../apache/drill/test/rowSet/DirectRowSet.java | 5 +
.../drill/test/rowSet/HyperRowSetImpl.java | 5 +
.../drill/test/rowSet/IndirectRowSet.java | 5 +
.../org/apache/drill/test/rowSet/RowSet.java | 2 +
7 files changed, 221 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index e9dcd28..63dcdb45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.record;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -157,4 +158,31 @@ public class BatchSchema implements Iterable<MaterializedField> {
return true;
}
+ /**
+ * Merge two schema to produce a new, merged schema. The caller is responsible
+ * for ensuring that column names are unique. The order of the fields in the
+ * new schema is the same as that of this schema, with the other schema's fields
+ * appended in the order defined in the other schema.
+ * <p>
+ * Merging data with selection vectors is unlikely to be useful, or work well.
+ * With a selection vector, the two record batches would have to be correlated
+ * both in their selection vectors AND in the underlying vectors. Such a use case
+ * is hard to imagine. So, for now, this method forbids merging schemas if either
+ * of them carry a selection vector. If we discover a meaningful use case, we can
+ * revisit the issue.
+ * @param otherSchema the schema to merge with this one
+ * @return the new, merged, schema
+ */
+
+ public BatchSchema merge(BatchSchema otherSchema) {
+ if (selectionVectorMode != SelectionVectorMode.NONE ||
+ otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
+ throw new IllegalArgumentException("Cannot merge schemas with selection vectors");
+ }
+ List<MaterializedField> mergedFields =
+ new ArrayList<>(fields.size() + otherSchema.fields.size());
+ mergedFields.addAll(this.fields);
+ mergedFields.addAll(otherSchema.fields);
+ return new BatchSchema(selectionVectorMode, mergedFields);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 69e04ac..54a04bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -60,6 +60,28 @@ public class VectorContainer implements VectorAccessible {
this.allocator = allocator;
}
+ /**
+ * Create a new vector container given a pre-defined schema. Creates the
+ * corresponding vectors, but does not allocate memory for them. Call
+ * {@link #allocateNew()} or {@link #allocateNewSafe()} to allocate
+ * memory.
+ * <p>
+ * Note that this method does the equivalent of {@link #buildSchema(SelectionVectorMode)}
+ * using the schema provided.
+ *
+ * @param allocator allocator to be used to allocate memory later
+ * @param schema the schema that defines the vectors to create
+ */
+
+ public VectorContainer(BufferAllocator allocator, BatchSchema schema) {
+ this.allocator = allocator;
+ for (MaterializedField field : schema) {
+ addOrGet(field, null);
+ }
+ this.schema = schema;
+ schemaChanged = false;
+ }
+
@Override
public String toString() {
return super.toString()
@@ -304,7 +326,6 @@ public class VectorContainer implements VectorAccessible {
}
return va.getChildWrapper(fieldIds);
-
}
private VectorWrapper<?> getValueAccessorById(int... fieldIds) {
@@ -375,9 +396,7 @@ public class VectorContainer implements VectorAccessible {
* Clears the contained vectors. (See {@link ValueVector#clear}).
*/
public void zeroVectors() {
- for (VectorWrapper<?> w : wrappers) {
- w.clear();
- }
+ VectorAccessibleUtilities.clear(this);
}
public int getNumberOfColumns() {
@@ -398,4 +417,31 @@ public class VectorContainer implements VectorAccessible {
}
return true;
}
+
+ /**
+ * Merge two batches to create a single, combined, batch. Vectors
+ * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}.
+ * The two batches must have identical row counts. The pattern is that
+ * this container is the main part of the record batch, the other
+ * represents new columns to merge.
+ * <p>
+ * Reference counts on the underlying buffers are <b>unchanged</b>.
+ * The client code is assumed to abandon the two input containers in
+ * favor of the merged container.
+ *
+ * @param otherContainer the container to merge with this one
+ * @return a new, merged, container
+ */
+ public VectorContainer merge(VectorContainer otherContainer) {
+ if (recordCount != otherContainer.recordCount) {
+ throw new IllegalArgumentException();
+ }
+ VectorContainer merged = new VectorContainer(allocator);
+ merged.schema = schema.merge(otherContainer.schema);
+ merged.recordCount = recordCount;
+ merged.wrappers.addAll(wrappers);
+ merged.wrappers.addAll(otherContainer.wrappers);
+ merged.schemaChanged = false;
+ return merged;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
new file mode 100644
index 0000000..d7a59bf
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.*;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestVectorContainer extends DrillTest {
+
+ // TODO: Replace the following with an extension of SubOperatorTest class
+ // once that is available.
+
+ protected static OperatorFixture fixture;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ fixture = OperatorFixture.standardFixture();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ fixture.close();
+ }
+
+ /**
+ * Test of the ability to merge two schemas and to merge
+ * two vector containers. The merge is "horizontal", like
+ * a row-by-row join. Since each container is a list of
+ * vectors, we just combine the two lists to create the
+ * merged result.
+ */
+ @Test
+ public void testContainerMerge() {
+
+ // Simulated data from a reader
+
+ BatchSchema leftSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .build();
+ SingleRowSet left = fixture.rowSetBuilder(leftSchema)
+ .add(10, "fred")
+ .add(20, "barney")
+ .add(30, "wilma")
+ .build();
+
+ // Simulated "implicit" coumns: row number and file name
+
+ BatchSchema rightSchema = new SchemaBuilder()
+ .add("x", MinorType.SMALLINT)
+ .add("y", MinorType.VARCHAR)
+ .build();
+ SingleRowSet right = fixture.rowSetBuilder(rightSchema)
+ .add(1, "foo.txt")
+ .add(2, "bar.txt")
+ .add(3, "dino.txt")
+ .build();
+
+ // The merge batch we expect to see
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("x", MinorType.SMALLINT)
+ .add("y", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .add(10, "fred", 1, "foo.txt")
+ .add(20, "barney", 2, "bar.txt")
+ .add(30, "wilma", 3, "dino.txt")
+ .build();
+
+ // Merge containers without selection vector
+
+ RowSet merged = fixture.wrap(
+ left.container().merge(right.container()));
+
+ RowSetComparison comparison = new RowSetComparison(expected);
+ comparison.verify(merged);
+
+ // Merge containers via row set facade
+
+ RowSet mergedRs = left.merge(right);
+ comparison.verifyAndClear(mergedRs);
+
+ // Add a selection vector. Merging is forbidden, in the present code,
+ // for batches that have a selection vector.
+
+ SingleRowSet leftIndirect = left.toIndirect();
+ try {
+ leftIndirect.merge(right);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ leftIndirect.clear();
+ right.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 706db27..29a1702 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -233,4 +233,9 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
@Override
public SelectionVector2 getSv2() { return null; }
+
+ @Override
+ public RowSet merge(RowSet other) {
+ return new DirectRowSet(allocator, container().merge(other.container()));
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index c7cb1b2..afc2e6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -292,4 +292,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
@Override
public int rowCount() { return sv4.getCount(); }
+
+ @Override
+ public RowSet merge(RowSet other) {
+ return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index f90fbb7..17a0ac8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -122,4 +122,9 @@ public class IndirectRowSet extends AbstractSingleRowSet {
RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
return sizer.actualSize();
}
+
+ @Override
+ public RowSet merge(RowSet other) {
+ return new IndirectRowSet(allocator, container().merge(other.container()), sv2);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index d22139c..b6bbd4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -162,6 +162,8 @@ public interface RowSet {
int size();
+ RowSet merge(RowSet other);
+
BatchSchema batchSchema();
/**