You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:54 UTC

[07/10] git commit: DRILL-303: RecordBatchLoader.load always creates new schema

DRILL-303: RecordBatchLoader.load always creates new schema


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b12c0b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b12c0b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b12c0b15

Branch: refs/heads/master
Commit: b12c0b155698a3d2ecd5cf3bfdf994fccd65f8d6
Parents: 2c811a8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:06:05 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:06:05 2013 -0800

----------------------------------------------------------------------
 .../drill/exec/record/RecordBatchLoader.java    | 17 +++---
 .../exec/physical/impl/TestUnionExchange.java   | 62 ++++++++++++++++++++
 .../test/resources/sender/union_exchange.json   | 47 +++++++++++++++
 3 files changed, 116 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 016f340..f19184f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -62,10 +62,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     this.valueCount = def.getRecordCount();
     boolean schemaChanged = schema == null;
 
-    Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
+    Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
     for(VectorWrapper<?> w : container){
       ValueVector v = w.getValueVector();
-      oldFields.put(v.getField(), v);
+      oldFields.put(v.getField().getDef(), v);
     }
     
     VectorContainer newVectors = new VectorContainer();
@@ -76,15 +76,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     for (FieldMetadata fmd : fields) {
       FieldDef fieldDef = fmd.getDef();
       ValueVector v = oldFields.remove(fieldDef);
-      if(v != null){
-        container.add(v);
-        continue;
+      if(v == null) {
+        // if we arrive here, we didn't have a matching vector.
+        schemaChanged = true;
+        MaterializedField m = new MaterializedField(fieldDef);
+        v = TypeHelper.getNewVector(m, allocator);
       }
-
-      // if we arrive here, we didn't have a matching vector.
-      schemaChanged = true;
-      MaterializedField m = new MaterializedField(fieldDef);
-      v = TypeHelper.getNewVector(m, allocator);
       if (fmd.getValueCount() == 0){
         v.clear();
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
new file mode 100644
index 0000000..2e16b47
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestUnionExchange extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionExchange.class);
+
+  @Test
+  public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+          Files.toString(FileUtils.getResourceAsFile("/sender/union_exchange.json"),
+              Charsets.UTF_8));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(150, count);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/resources/sender/union_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/union_exchange.json b/exec/java-exec/src/test/resources/sender/union_exchange.json
new file mode 100644
index 0000000..76963bd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/union_exchange.json
@@ -0,0 +1,47 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+              {records: 100, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "red", type: "BIGINT", mode: "REQUIRED"}
+              ]},
+              {records: 200, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "red", type: "BIGINT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "union-exchange"
+        },
+        {
+             @id: 3,
+             child: 2,
+             pop: "filter",
+             expr: "alternate()"
+         },
+         {
+             @id: 4,
+             child: 3,
+             pop:"selection-vector-remover"
+         },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file