You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/23 18:46:37 UTC
git commit: Fix for DRILL-65. Simple tests for distinct and
non-distinct union. To make things simple,
just treated all unions as blocking.
Updated Branches:
refs/heads/master d162e1338 -> 673a96d13
Fix for DRILL-65. Simple tests for distinct and non-distinct union. To make things simple, just treated all unions as blocking.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/673a96d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/673a96d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/673a96d1
Branch: refs/heads/master
Commit: 673a96d136165dfc242e598fd501f5dfba8c5edf
Parents: d162e13
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 23 09:46:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 23 09:46:27 2013 -0700
----------------------------------------------------------------------
.../apache/drill/common/logical/data/Union.java | 10 +-
.../org/apache/drill/exec/ref/ROPConverter.java | 3 +-
.../org/apache/drill/exec/ref/UnbackedRecord.java | 22 ++++
.../org/apache/drill/exec/ref/rops/UnionROP.java | 79 ++++++++++-----
.../apache/drill/exec/ref/values/DataValue.java | 1 +
.../drill/exec/ref/values/SimpleMapValue.java | 21 ++++
.../apache/drill/exec/ref/rops/UnionROPTest.java | 46 +++++++++
.../ref/src/test/resources/union/distinct.json | 71 +++++++++++++
.../ref/src/test/resources/union/nondistinct.json | 71 +++++++++++++
9 files changed, 291 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
index 7c8b88e..487401b 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
@@ -26,10 +26,10 @@ public class Union extends LogicalOperatorBase {
private final LogicalOperator[] inputs;
private final boolean distinct;
- @JsonCreator
- public Union(@JsonProperty("inputs") LogicalOperator[] inputs){
- this(inputs, false);
- }
+// @JsonCreator
+// public Union(@JsonProperty("inputs") LogicalOperator[] inputs){
+// this(inputs, false);
+// }
@JsonCreator
public Union(@JsonProperty("inputs") LogicalOperator[] inputs, @JsonProperty("distinct") Boolean distinct){
@@ -37,7 +37,7 @@ public class Union extends LogicalOperatorBase {
for (LogicalOperator o : inputs) {
o.registerAsSubscriber(this);
}
- this.distinct = distinct;
+ this.distinct = distinct == null ? false : distinct;
}
public LogicalOperator[] getInputs() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
index 06a8690..90f3374 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.ref.rops.ROP;
import org.apache.drill.exec.ref.rops.ScanROP;
import org.apache.drill.exec.ref.rops.StoreROP;
import org.apache.drill.exec.ref.rops.UnionROP;
+import org.apache.drill.exec.ref.rops.UnionROP;
import org.apache.drill.exec.ref.rse.RSERegistry;
import org.apache.drill.exec.ref.rse.ReferenceStorageEngine;
import org.apache.drill.exec.ref.rse.ReferenceStorageEngine.ReadEntry;
@@ -134,7 +135,7 @@ class ROPConverter {
scanner.init(registry, builder);
return;
default:
- Union logOp = new Union(null);
+ Union logOp = new Union(null, false);
ROP parentUnion = new UnionROP(logOp);
ScanROP[] scanners = new ScanROP[readEntries.size()];
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index 6152a32..1fa4348 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -105,5 +105,27 @@ public class UnbackedRecord implements RecordPointer {
return "UnbackedRecord [root=" + root + "]";
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((root == null) ? 0 : root.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ UnbackedRecord other = (UnbackedRecord) obj;
+ if (root == null) {
+ if (other.root != null) return false;
+ } else if (!root.equals(other.root)) return false;
+ return true;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
index aca0b5a..b823e68 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
@@ -17,66 +17,91 @@
******************************************************************************/
package org.apache.drill.exec.ref.rops;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Union;
import org.apache.drill.exec.ref.IteratorRegistry;
import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordIterator.NextOutcome;
import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.SetupException;
-public class UnionROP extends ROPBase<LogicalOperator>{
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class UnionROP extends ROPBase<Union> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROP.class);
-
- private List<RecordIterator> incoming;
- private ProxySimpleRecord record;
+
+ private Collection<UnbackedRecord> records;
+ private List<RecordIterator> incoming = Lists.newArrayList();
+ private Iterator<UnbackedRecord> iterator;
public UnionROP(Union config) {
super(config);
+ // to make things simple, we'll just always make this a blocking operator.
+ if(config.isDistinct()){
+ records = Sets.newHashSet();
+ }else{
+ records = Lists.newArrayList();
+ }
}
-
+
@Override
- protected void setupEvals(EvaluatorFactory builder) {
+ protected void setupIterators(IteratorRegistry registry) throws SetupException {
+ for(LogicalOperator op : config.getInputs()){
+ List<RecordIterator> more = registry.getOperator(op);
+ if(more.size() != 1) throw new SetupException("Iterator list was incorrect size.");
+ incoming.addAll(more);
+ }
}
- @Override
- protected void setupIterators(IteratorRegistry builder) {
- incoming = builder.getOperator(config);
- record.setRecord(incoming.get(0).getRecordPointer());
+
+ protected void doWork() {
+ for(RecordIterator ri : incoming){
+ RecordPointer rp = ri.getRecordPointer();
+ while(ri.next() != NextOutcome.NONE_LEFT){
+ UnbackedRecord r = new UnbackedRecord();
+ r.copyFrom(rp);
+ records.add(r);
+ }
+ }
+ this.iterator = records.iterator();
}
@Override
protected RecordIterator getIteratorInternal() {
- return new MultiIterator();
+ return new ProxyIterator();
}
- private class MultiIterator implements RecordIterator{
- private int current = 0;
+ private class ProxyIterator implements RecordIterator{
+ private ProxySimpleRecord proxyRecord = new ProxySimpleRecord();
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return proxyRecord;
+ }
@Override
public NextOutcome next() {
- for(; current < incoming.size(); current++, record.setRecord(incoming.get(current).getRecordPointer()))
- while(current < incoming.size()){
+ if(iterator == null) doWork();
- NextOutcome n = incoming.get(current).next();
- if(n != NextOutcome.NONE_LEFT) return n;
-
+ if(iterator.hasNext()){
+ proxyRecord.setRecord(iterator.next());
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ }else{
+ return NextOutcome.NONE_LEFT;
}
- return NextOutcome.NONE_LEFT;
+
}
@Override
public ROP getParent() {
return UnionROP.this;
}
-
- @Override
- public RecordPointer getRecordPointer() {
- return record;
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
index 9e40014..c1a2980 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
@@ -22,6 +22,7 @@ public interface DataValue {
public BooleanValue getAsBooleanValue();
public BytesValue getAsBytesValue();
public boolean equals(DataValue v);
+ public boolean equals(Object v);
public int hashCode();
public DataValue copy();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
index e16e8c1..1c170f2 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.ref.values;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -106,4 +107,24 @@ public class SimpleMapValue extends BaseMapValue{
}
return out;
}
+
+ @Override
+ public String toString() {
+ final int maxLen = 10;
+ return "SimpleMapValue [map=" + (map != null ? toString(map.entrySet(), maxLen) : null) + "]";
+ }
+
+ private String toString(Collection<?> collection, int maxLen) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ int i = 0;
+ for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+ if (i > 0) builder.append(", ");
+ builder.append(iterator.next());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
new file mode 100644
index 0000000..7168a55
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java
@@ -0,0 +1,46 @@
+package org.apache.drill.exec.ref.rops;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.TestUtils;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.junit.Test;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class UnionROPTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROPTest.class);
+
+
+ @Test
+ public void checkDistinct() throws Exception{
+ TestUtils.assertProduceCount("/union/distinct.json", 5);
+ }
+
+ @Test
+ public void checkNonDistinct() throws Exception{
+ TestUtils.assertProduceCount("/union/nondistinct.json", 10);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
new file mode 100644
index 0000000..b975a77
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json
@@ -0,0 +1,71 @@
+{
+ "head" : {
+ "type" : "apache_drill_logical_plan",
+ "version" : 1,
+ "generator" : {
+ "type" : "manual",
+ "info" : "na"
+ }
+ },
+ "storage" : [ {
+ "type" : "queue",
+ "name" : "queue",
+ "encoding" : "RECORD"
+
+ }, {
+ "type" : "classpath",
+ "name" : "donuts-json"
+ } ],
+ "query" : [ {
+ "op" : "scan",
+ "@id" : 1,
+ "memo" : "initial_scan",
+ "storageengine" : "donuts-json",
+ "selection" : {
+ "path" : "/employees.json",
+ "type" : "JSON"
+ },
+ "ref" : "_MAP"
+ }, {
+ "op" : "project",
+ "input" : 1,
+ "@id" : 2,
+ "projections" : [ {
+ "ref" : "output.deptId",
+ "expr" : "_MAP.deptId"
+ } ]
+ }, {
+ "op" : "scan",
+ "@id" : 3,
+ "memo" : "initial_scan",
+ "storageengine" : "donuts-json",
+ "selection" : {
+ "path" : "/departments.json",
+ "type" : "JSON"
+ },
+ "ref" : "_MAP"
+ }, {
+ "op" : "project",
+ "input" : 3,
+ "@id" : 4,
+ "projections" : [ {
+ "ref" : "output.deptId",
+ "expr" : "_MAP.deptId"
+ } ]
+ }, {
+ "op": "union",
+ "@id" : 5,
+ "distinct": true,
+ "inputs": [2, 4]
+ }, {
+ "op" : "store",
+ "input" : 5,
+ "@id" : 6,
+ "memo" : "output sink",
+ "target" : {
+ "number" : 0
+ },
+ "partition" : null,
+ "storageEngine" : "queue"
+ } ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
new file mode 100644
index 0000000..817ed48
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json
@@ -0,0 +1,71 @@
+{
+ "head" : {
+ "type" : "apache_drill_logical_plan",
+ "version" : 1,
+ "generator" : {
+ "type" : "manual",
+ "info" : "na"
+ }
+ },
+ "storage" : [ {
+ "type" : "queue",
+ "name" : "queue",
+ "encoding" : "RECORD"
+
+ }, {
+ "type" : "classpath",
+ "name" : "donuts-json"
+ } ],
+ "query" : [ {
+ "op" : "scan",
+ "@id" : 1,
+ "memo" : "initial_scan",
+ "storageengine" : "donuts-json",
+ "selection" : {
+ "path" : "/employees.json",
+ "type" : "JSON"
+ },
+ "ref" : "_MAP"
+ }, {
+ "op" : "project",
+ "input" : 1,
+ "@id" : 2,
+ "projections" : [ {
+ "ref" : "output.deptId",
+ "expr" : "_MAP.deptId"
+ } ]
+ }, {
+ "op" : "scan",
+ "@id" : 3,
+ "memo" : "initial_scan",
+ "storageengine" : "donuts-json",
+ "selection" : {
+ "path" : "/departments.json",
+ "type" : "JSON"
+ },
+ "ref" : "_MAP"
+ }, {
+ "op" : "project",
+ "input" : 3,
+ "@id" : 4,
+ "projections" : [ {
+ "ref" : "output.deptId",
+ "expr" : "_MAP.deptId"
+ } ]
+ }, {
+ "op": "union",
+ "@id" : 5,
+ "distinct": false,
+ "inputs": [2, 4]
+ }, {
+ "op" : "store",
+ "input" : 5,
+ "@id" : 6,
+ "memo" : "output sink",
+ "target" : {
+ "number" : 0
+ },
+ "partition" : null,
+ "storageEngine" : "queue"
+ } ]
+}