You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/19 10:50:43 UTC

[3/3] drill git commit: DRILL-5514: Enhance VectorContainer to merge two row sets

DRILL-5514: Enhance VectorContainer to merge two row sets

Adds ability to merge two schemas and to merge two vector containers,
in each case producing a new, merged result. See DRILL-5514 for details.

Also provides a handy constructor to create a vector container given a
pre-defined schema.

closes #837


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be43a9ed
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be43a9ed
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be43a9ed

Branch: refs/heads/master
Commit: be43a9edd148ef3af6f92c5ce7cda235c5ac1ad6
Parents: b714b2d
Author: Paul Rogers <pr...@maprtech.com>
Authored: Mon May 15 15:59:35 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:22:10 2017 +0300

----------------------------------------------------------------------
 .../apache/drill/exec/record/BatchSchema.java   |  28 +++++
 .../drill/exec/record/VectorContainer.java      |  54 +++++++-
 .../drill/exec/record/TestVectorContainer.java  | 126 +++++++++++++++++++
 .../apache/drill/test/rowSet/DirectRowSet.java  |   5 +
 .../drill/test/rowSet/HyperRowSetImpl.java      |   5 +
 .../drill/test/rowSet/IndirectRowSet.java       |   5 +
 .../org/apache/drill/test/rowSet/RowSet.java    |   2 +
 7 files changed, 221 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index e9dcd28..63dcdb45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -157,4 +158,31 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return true;
   }
 
+  /**
+   * Merge two schema to produce a new, merged schema. The caller is responsible
+   * for ensuring that column names are unique. The order of the fields in the
+   * new schema is the same as that of this schema, with the other schema's fields
+   * appended in the order defined in the other schema.
+   * <p>
+   * Merging data with selection vectors is unlikely to be useful, or work well.
+   * With a selection vector, the two record batches would have to be correlated
+   * both in their selection vectors AND in the underlying vectors. Such a use case
+   * is hard to imagine. So, for now, this method forbids merging schemas if either
+   * of them carry a selection vector. If we discover a meaningful use case, we can
+   * revisit the issue.
+   * @param otherSchema the schema to merge with this one
+   * @return the new, merged, schema
+   */
+
+  public BatchSchema merge(BatchSchema otherSchema) {
+    if (selectionVectorMode != SelectionVectorMode.NONE ||
+        otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
+      throw new IllegalArgumentException("Cannot merge schemas with selection vectors");
+    }
+    List<MaterializedField> mergedFields =
+        new ArrayList<>(fields.size() + otherSchema.fields.size());
+    mergedFields.addAll(this.fields);
+    mergedFields.addAll(otherSchema.fields);
+    return new BatchSchema(selectionVectorMode, mergedFields);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 69e04ac..54a04bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -60,6 +60,28 @@ public class VectorContainer implements VectorAccessible {
     this.allocator = allocator;
   }
 
+  /**
+   * Create a new vector container given a pre-defined schema. Creates the
+   * corresponding vectors, but does not allocate memory for them. Call
+   * {@link #allocateNew()} or {@link #allocateNewSafe()} to allocate
+   * memory.
+   * <p>
+   * Note that this method does the equivalent of {@link #buildSchema(SelectionVectorMode)}
+   * using the schema provided.
+   *
+   * @param allocator allocator to be used to allocate memory later
+   * @param schema the schema that defines the vectors to create
+   */
+
+  public VectorContainer(BufferAllocator allocator, BatchSchema schema) {
+    this.allocator = allocator;
+    for (MaterializedField field : schema) {
+      addOrGet(field, null);
+    }
+    this.schema = schema;
+    schemaChanged = false;
+  }
+
   @Override
   public String toString() {
     return super.toString()
@@ -304,7 +326,6 @@ public class VectorContainer implements VectorAccessible {
     }
 
     return va.getChildWrapper(fieldIds);
-
   }
 
   private VectorWrapper<?> getValueAccessorById(int... fieldIds) {
@@ -375,9 +396,7 @@ public class VectorContainer implements VectorAccessible {
    * Clears the contained vectors.  (See {@link ValueVector#clear}).
    */
   public void zeroVectors() {
-    for (VectorWrapper<?> w : wrappers) {
-      w.clear();
-    }
+    VectorAccessibleUtilities.clear(this);
   }
 
   public int getNumberOfColumns() {
@@ -398,4 +417,31 @@ public class VectorContainer implements VectorAccessible {
     }
     return true;
   }
+
+  /**
+   * Merge two batches to create a single, combined, batch. Vectors
+   * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}.
+   * The two batches must have identical row counts. The pattern is that
+   * this container is the main part of the record batch, the other
+   * represents new columns to merge.
+   * <p>
+   * Reference counts on the underlying buffers are <b>unchanged</b>.
+   * The client code is assumed to abandon the two input containers in
+   * favor of the merged container.
+   *
+   * @param otherContainer the container to merge with this one
+   * @return a new, merged, container
+   */
+  public VectorContainer merge(VectorContainer otherContainer) {
+    if (recordCount != otherContainer.recordCount) {
+      throw new IllegalArgumentException();
+    }
+    VectorContainer merged = new VectorContainer(allocator);
+    merged.schema = schema.merge(otherContainer.schema);
+    merged.recordCount = recordCount;
+    merged.wrappers.addAll(wrappers);
+    merged.wrappers.addAll(otherContainer.wrappers);
+    merged.schemaChanged = false;
+    return merged;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
new file mode 100644
index 0000000..d7a59bf
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.*;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestVectorContainer extends DrillTest {
+
+  // TODO: Replace the following with an extension of SubOperatorTest class
+  // once that is available.
+
+  protected static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.standardFixture();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  /**
+   * Test of the ability to merge two schemas and to merge
+   * two vector containers. The merge is "horizontal", like
+   * a row-by-row join. Since each container is a list of
+   * vectors, we just combine the two lists to create the
+   * merged result.
+   */
+  @Test
+  public void testContainerMerge() {
+
+    // Simulated data from a reader
+
+    BatchSchema leftSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .build();
+    SingleRowSet left = fixture.rowSetBuilder(leftSchema)
+        .add(10, "fred")
+        .add(20, "barney")
+        .add(30, "wilma")
+        .build();
+
+    // Simulated "implicit" coumns: row number and file name
+
+    BatchSchema rightSchema = new SchemaBuilder()
+        .add("x", MinorType.SMALLINT)
+        .add("y", MinorType.VARCHAR)
+        .build();
+    SingleRowSet right = fixture.rowSetBuilder(rightSchema)
+        .add(1, "foo.txt")
+        .add(2, "bar.txt")
+        .add(3, "dino.txt")
+        .build();
+
+    // The merge batch we expect to see
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("x", MinorType.SMALLINT)
+        .add("y", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .add(10, "fred", 1, "foo.txt")
+        .add(20, "barney", 2, "bar.txt")
+        .add(30, "wilma", 3, "dino.txt")
+        .build();
+
+    // Merge containers without selection vector
+
+    RowSet merged = fixture.wrap(
+        left.container().merge(right.container()));
+
+    RowSetComparison comparison = new RowSetComparison(expected);
+    comparison.verify(merged);
+
+    // Merge containers via row set facade
+
+    RowSet mergedRs = left.merge(right);
+    comparison.verifyAndClear(mergedRs);
+
+    // Add a selection vector. Merging is forbidden, in the present code,
+    // for batches that have a selection vector.
+
+    SingleRowSet leftIndirect = left.toIndirect();
+    try {
+      leftIndirect.merge(right);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+    leftIndirect.clear();
+    right.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 706db27..29a1702 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -233,4 +233,9 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public SelectionVector2 getSv2() { return null; }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new DirectRowSet(allocator, container().merge(other.container()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index c7cb1b2..afc2e6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -292,4 +292,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
   @Override
   public int rowCount() { return sv4.getCount(); }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index f90fbb7..17a0ac8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -122,4 +122,9 @@ public class IndirectRowSet extends AbstractSingleRowSet {
     RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
     return sizer.actualSize();
   }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new IndirectRowSet(allocator, container().merge(other.container()), sv2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index d22139c..b6bbd4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -162,6 +162,8 @@ public interface RowSet {
 
   int size();
 
+  RowSet merge(RowSet other);
+
   BatchSchema batchSchema();
 
   /**