You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/23 18:46:37 UTC

git commit: Fix for DRILL-65. Simple tests for distinct and non-distinct union. To make things simple, just treated all unions as blocking.

Updated Branches:
  refs/heads/master d162e1338 -> 673a96d13


Fix for DRILL-65.  Simple tests for distinct and non-distinct union.  To make things simple, just treated all unions as blocking.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/673a96d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/673a96d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/673a96d1

Branch: refs/heads/master
Commit: 673a96d136165dfc242e598fd501f5dfba8c5edf
Parents: d162e13
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 23 09:46:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 23 09:46:27 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/common/logical/data/Union.java    |   10 +-
 .../org/apache/drill/exec/ref/ROPConverter.java    |    3 +-
 .../org/apache/drill/exec/ref/UnbackedRecord.java  |   22 ++++
 .../org/apache/drill/exec/ref/rops/UnionROP.java   |   79 ++++++++++-----
 .../apache/drill/exec/ref/values/DataValue.java    |    1 +
 .../drill/exec/ref/values/SimpleMapValue.java      |   21 ++++
 .../apache/drill/exec/ref/rops/UnionROPTest.java   |   46 +++++++++
 .../ref/src/test/resources/union/distinct.json     |   71 +++++++++++++
 .../ref/src/test/resources/union/nondistinct.json  |   71 +++++++++++++
 9 files changed, 291 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
index 7c8b88e..487401b 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
@@ -26,10 +26,10 @@ public class Union extends LogicalOperatorBase {
   private final LogicalOperator[] inputs;
   private final boolean distinct;
 
-  @JsonCreator
-  public Union(@JsonProperty("inputs") LogicalOperator[] inputs){
-    this(inputs, false);
-  }
+//  @JsonCreator
+//  public Union(@JsonProperty("inputs") LogicalOperator[] inputs){
+//    this(inputs, false);
+//  }
   
   @JsonCreator
   public Union(@JsonProperty("inputs") LogicalOperator[] inputs, @JsonProperty("distinct") Boolean distinct){
@@ -37,7 +37,7 @@ public class Union extends LogicalOperatorBase {
     for (LogicalOperator o : inputs) {
       o.registerAsSubscriber(this);
     }
-    this.distinct = distinct;
+    this.distinct = distinct == null ? false : distinct;
   }
 
   public LogicalOperator[] getInputs() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
index 06a8690..90f3374 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.ref.rops.ROP;
 import org.apache.drill.exec.ref.rops.ScanROP;
 import org.apache.drill.exec.ref.rops.StoreROP;
 import org.apache.drill.exec.ref.rops.UnionROP;
+import org.apache.drill.exec.ref.rops.UnionROP;
 import org.apache.drill.exec.ref.rse.RSERegistry;
 import org.apache.drill.exec.ref.rse.ReferenceStorageEngine;
 import org.apache.drill.exec.ref.rse.ReferenceStorageEngine.ReadEntry;
@@ -134,7 +135,7 @@ class ROPConverter {
       scanner.init(registry, builder);
       return;
     default:
-      Union logOp = new Union(null);
+      Union logOp = new Union(null, false);
 
       ROP parentUnion = new UnionROP(logOp);
       ScanROP[] scanners = new ScanROP[readEntries.size()];

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index 6152a32..1fa4348 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -105,5 +105,27 @@ public class UnbackedRecord implements RecordPointer {
         return "UnbackedRecord [root=" + root + "]";
     }
 
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((root == null) ? 0 : root.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      UnbackedRecord other = (UnbackedRecord) obj;
+      if (root == null) {
+        if (other.root != null) return false;
+      } else if (!root.equals(other.root)) return false;
+      return true;
+    }
+    
+    
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
index aca0b5a..b823e68 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
@@ -17,66 +17,91 @@
  ******************************************************************************/
 package org.apache.drill.exec.ref.rops;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Union;
 import org.apache.drill.exec.ref.IteratorRegistry;
 import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordIterator.NextOutcome;
 import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.SetupException;
 
-public class UnionROP extends ROPBase<LogicalOperator>{
-  
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class UnionROP extends ROPBase<Union> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROP.class);
-  
-  private List<RecordIterator> incoming;
-  private ProxySimpleRecord record;
+
+  private Collection<UnbackedRecord> records;
+  private List<RecordIterator> incoming = Lists.newArrayList();
+  private Iterator<UnbackedRecord> iterator;
   
   public UnionROP(Union config) {
     super(config);
+    // to make things simple, we'll just always make this a blocking operator.
+    if(config.isDistinct()){
+      records  = Sets.newHashSet();
+    }else{
+      records = Lists.newArrayList();
+    }
   }
-
+  
   @Override
-  protected void setupEvals(EvaluatorFactory builder) {
+  protected void setupIterators(IteratorRegistry registry) throws SetupException {
+    for(LogicalOperator op : config.getInputs()){
+      List<RecordIterator> more = registry.getOperator(op);
+      if(more.size() != 1) throw new SetupException("Iterator list was incorrect size.");
+      incoming.addAll(more);
+    }
   }
 
-  @Override
-  protected void setupIterators(IteratorRegistry builder) {
-    incoming = builder.getOperator(config);
-    record.setRecord(incoming.get(0).getRecordPointer());
+
+  protected void doWork() {
+    for(RecordIterator ri : incoming){
+      RecordPointer rp = ri.getRecordPointer();
+      while(ri.next() != NextOutcome.NONE_LEFT){
+        UnbackedRecord r = new UnbackedRecord();
+        r.copyFrom(rp);
+        records.add(r);
+      }
+    }
+    this.iterator = records.iterator();
   }
 
   @Override
   protected RecordIterator getIteratorInternal() {
-    return new MultiIterator();
+    return new ProxyIterator();
   }
   
-  private class MultiIterator implements RecordIterator{
-    private int current = 0;
+  private class ProxyIterator implements RecordIterator{
+    private ProxySimpleRecord proxyRecord = new ProxySimpleRecord();
+    
+    @Override
+    public RecordPointer getRecordPointer() {
+      return proxyRecord;
+    }
 
     @Override
     public NextOutcome next() {
-      for(; current < incoming.size(); current++, record.setRecord(incoming.get(current).getRecordPointer()))
-      while(current < incoming.size()){
+      if(iterator == null) doWork();
       
-        NextOutcome n = incoming.get(current).next();
-        if(n != NextOutcome.NONE_LEFT) return n;
-        
+      if(iterator.hasNext()){
+        proxyRecord.setRecord(iterator.next());
+        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+      }else{
+        return NextOutcome.NONE_LEFT;
       }
-      return NextOutcome.NONE_LEFT;
+      
     }
 
     @Override
     public ROP getParent() {
       return UnionROP.this;
     }
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return record;
-    }
     
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
index 9e40014..c1a2980 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
@@ -22,6 +22,7 @@ public interface DataValue {
   public BooleanValue getAsBooleanValue();
   public BytesValue getAsBytesValue();
   public boolean equals(DataValue v);
+  public boolean equals(Object v);
   public int hashCode();
   public DataValue copy();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
index e16e8c1..1c170f2 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.ref.values;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -106,4 +107,24 @@ public class SimpleMapValue extends BaseMapValue{
       }
       return out;
   }
+
+  @Override
+  public String toString() {
+    final int maxLen = 10;
+    return "SimpleMapValue [map=" + (map != null ? toString(map.entrySet(), maxLen) : null) + "]";
+  }
+
+  private String toString(Collection<?> collection, int maxLen) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("[");
+    int i = 0;
+    for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+      if (i > 0) builder.append(", ");
+      builder.append(iterator.next());
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
new file mode 100644
index 0000000..7168a55
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
@@ -0,0 +1,46 @@
+package org.apache.drill.exec.ref.rops;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.TestUtils;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.junit.Test;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class UnionROPTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROPTest.class);
+  
+  
+  @Test
+  public void checkDistinct() throws Exception{
+    TestUtils.assertProduceCount("/union/distinct.json", 5);
+  }
+
+  @Test
+  public void checkNonDistinct() throws Exception{
+    TestUtils.assertProduceCount("/union/nondistinct.json", 10);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
new file mode 100644
index 0000000..b975a77
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
@@ -0,0 +1,71 @@
+{
+  "head" : {
+    "type" : "apache_drill_logical_plan",
+    "version" : 1,
+    "generator" : {
+      "type" : "manual",
+      "info" : "na"
+    }
+  },
+  "storage" : [ {
+    "type" : "queue",
+    "name" : "queue",
+    "encoding" : "RECORD"
+    
+  }, {
+    "type" : "classpath",
+    "name" : "donuts-json"
+  } ],
+  "query" : [ {
+    "op" : "scan",
+    "@id" : 1,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/employees.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 1,
+    "@id" : 2,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  }, {
+    "op" : "scan",
+    "@id" : 3,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/departments.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 3,
+    "@id" : 4,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  },  {
+    "op": "union",
+    "@id" : 5,
+    "distinct": true,
+    "inputs": [2, 4]
+  }, {
+    "op" : "store",
+    "input" : 5,
+    "@id" : 6,
+    "memo" : "output sink",
+    "target" : {
+      "number" : 0
+    },
+    "partition" : null,
+    "storageEngine" : "queue"
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
new file mode 100644
index 0000000..817ed48
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
@@ -0,0 +1,71 @@
+{
+  "head" : {
+    "type" : "apache_drill_logical_plan",
+    "version" : 1,
+    "generator" : {
+      "type" : "manual",
+      "info" : "na"
+    }
+  },
+  "storage" : [ {
+    "type" : "queue",
+    "name" : "queue",
+    "encoding" : "RECORD"
+    
+  }, {
+    "type" : "classpath",
+    "name" : "donuts-json"
+  } ],
+  "query" : [ {
+    "op" : "scan",
+    "@id" : 1,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/employees.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 1,
+    "@id" : 2,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  }, {
+    "op" : "scan",
+    "@id" : 3,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/departments.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 3,
+    "@id" : 4,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  },  {
+    "op": "union",
+    "@id" : 5,
+    "distinct": false,
+    "inputs": [2, 4]
+  }, {
+    "op" : "store",
+    "input" : 5,
+    "@id" : 6,
+    "memo" : "output sink",
+    "target" : {
+      "number" : 0
+    },
+    "partition" : null,
+    "storageEngine" : "queue"
+  } ]
+}