You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/02/25 04:40:53 UTC
[1/2] git commit: Implement Join ROP
Implement Join ROP
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/08bb3be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/08bb3be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/08bb3be6
Branch: refs/heads/master
Commit: 08bb3be62eb91fb8618bb7840a04db1b2608fbde
Parents: 6e4a138
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Feb 5 01:53:59 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Feb 24 19:36:24 2013 -0800
----------------------------------------------------------------------
.../org/apache/drill/common/logical/data/Join.java | 4 +-
.../drill/common/logical/data/LogicalOperator.java | 2 +-
.../org/apache/drill/exec/ref/UnbackedRecord.java | 144 ++++----
.../exec/ref/eval/fn/ComparisonEvaluators.java | 6 +-
.../org/apache/drill/exec/ref/rops/JoinROP.java | 309 +++++++++++++++
.../exec/ref/src/test/resources/departments.json | 16 +
.../exec/ref/src/test/resources/employees.json | 23 ++
.../exec/ref/src/test/resources/simple_join.json | 70 ++++
8 files changed, 501 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
index 1ce0306..9742cd7 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
@@ -8,7 +8,7 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,7 @@ public class Join extends LogicalOperatorBase {
private final JoinCondition[] conditions;
public static enum JoinType{
- LEFT, RIGHT, INNER, OUTER;
+ LEFT, INNER, OUTER;
public static JoinType resolve(String val){
for(JoinType jt : JoinType.values()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
index 8644107..e2bda45 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
@@ -8,7 +8,7 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index ada191d..bc6ae0e 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,77 +28,83 @@ import org.apache.drill.exec.ref.values.DataValue;
import org.apache.drill.exec.ref.values.SimpleMapValue;
import org.apache.drill.exec.ref.values.ValueUtils;
-public class UnbackedRecord implements RecordPointer{
-
- private DataValue root = new SimpleMapValue();
-
- public DataValue getField(SchemaPath field) {
- return root.getValue(field.getRootSegment());
- }
-
- public void addField(SchemaPath field, DataValue value) {
- addField(field.getRootSegment(), value);
- }
-
- @Override
- public void addField(PathSegment segment, DataValue value) {
- root.addValue(segment, value);
- }
-
- @Override
- public void removeField(SchemaPath field) {
- root.removeValue(field.getRootSegment());
- }
-
- @Override
- public void write(DataWriter writer) throws IOException {
- writer.startRecord();
- root.write(writer);
- writer.endRecord();
- }
-
- public void merge(DataValue v){
- if(v instanceof ContainerValue){
- this.root = ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, root, v);
- }else{
- this.root = v;
+public class UnbackedRecord implements RecordPointer {
+
+ private DataValue root = new SimpleMapValue();
+
+ public DataValue getField(SchemaPath field) {
+ return root.getValue(field.getRootSegment());
+ }
+
+ public void addField(SchemaPath field, DataValue value) {
+ addField(field.getRootSegment(), value);
}
- }
-
- @Override
- public RecordPointer copy() {
- // TODO: Make a deep copy.
- UnbackedRecord r = new UnbackedRecord();
- r.root = this.root;
- return r;
- }
-
- public void clear(){
- root = new SimpleMapValue();
- }
-
- public void setClearAndSetRoot(SchemaPath path, DataValue v){
- root = new SimpleMapValue();
- root.addValue(path.getRootSegment(), v);
- }
-
- @Override
- public void copyFrom(RecordPointer r) {
- if(r instanceof UnbackedRecord){
- this.root = ((UnbackedRecord)r).root.copy();
- }else{
- throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName()));
+
+ @Override
+ public void addField(PathSegment segment, DataValue value) {
+ root.addValue(segment, value);
}
- }
- @Override
- public String toString() {
- return "UnbackedRecord [root=" + root + "]";
- }
+ @Override
+ public void removeField(SchemaPath field) {
+ root.removeValue(field.getRootSegment());
+ }
+ @Override
+ public void write(DataWriter writer) throws IOException {
+ writer.startRecord();
+ root.write(writer);
+ writer.endRecord();
+ }
+
+ public void merge(DataValue v) {
+ if (v instanceof ContainerValue) {
+ this.root = ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, root, v);
+ } else {
+ this.root = v;
+ }
+ }
+
+ public void merge(RecordPointer pointer) {
+ if (pointer instanceof UnbackedRecord) {
+ merge(UnbackedRecord.class.cast(pointer).root);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unable to merge from a record of type %s to an UnbackedRecord.", pointer.getClass().getCanonicalName())
+ );
+ }
+ }
+
+ @Override
+ public RecordPointer copy() {
+ // TODO: Make a deep copy.
+ UnbackedRecord r = new UnbackedRecord();
+ r.root = this.root;
+ return r;
+ }
+
+ public void clear() {
+ root = new SimpleMapValue();
+ }
+
+ public void setClearAndSetRoot(SchemaPath path, DataValue v) {
+ root = new SimpleMapValue();
+ root.addValue(path.getRootSegment(), v);
+ }
+
+ @Override
+ public void copyFrom(RecordPointer r) {
+ if (r instanceof UnbackedRecord) {
+ this.root = ((UnbackedRecord) r).root.copy();
+ } else {
+ throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName()));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "UnbackedRecord [root=" + root + "]";
+ }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
index f3b2fff..3e6d428 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
@@ -45,6 +45,10 @@ public class ComparisonEvaluators {
}
}
+
+ public static boolean isComparable(DataValue a, DataValue b) {
+ return a instanceof ComparableValue && b instanceof ComparableValue && ((ComparableValue) a).supportsCompare(b);
+ }
private abstract static class ComparisonEvaluator extends BaseBasicEvaluator{
private final BasicEvaluator left;
@@ -63,7 +67,7 @@ public class ComparisonEvaluators {
DataValue a = left.eval();
DataValue b = right.eval();
- if(a instanceof ComparableValue && b instanceof ComparableValue && ((ComparableValue) a).supportsCompare(b)){
+ if(isComparable(a, b)){
int i = ((ComparableValue)a).compareTo(b);
return new BooleanScalar(valid( i));
}else{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
new file mode 100644
index 0000000..37c5657
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
@@ -0,0 +1,309 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.fn.ComparisonEvaluators;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.values.ComparableValue;
+import org.apache.drill.exec.ref.values.DataValue;
+
+import java.util.List;
+
+public class JoinROP extends ROPBase<Join> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinROP.class);
+
+ private RecordIterator left;
+ private RecordIterator right;
+ private UnbackedRecord record;
+ private EvaluatorFactory factory;
+
+ public JoinROP(Join config) {
+ super(config);
+ record = new UnbackedRecord();
+ }
+
+ @Override
+ protected void setupIterators(IteratorRegistry builder) {
+ left = Iterables.getOnlyElement(builder.getOperator(config.getLeft()));
+ right = Iterables.getOnlyElement(builder.getOperator(config.getRight()));
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) throws SetupException {
+ factory = builder;
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return createIteratorFromJoin(config.getJointType());
+ }
+
+ private RecordIterator createIteratorFromJoin(Join.JoinType type) {
+ switch (type) {
+ case LEFT:
+ return new LeftIterator();
+ case INNER:
+ return new InnerIterator();
+ case OUTER:
+ return new OuterIterator();
+ default:
+ throw new UnsupportedOperationException("Type not supported: " + type);
+ }
+ }
+
+ private class RecordBuffer {
+ final boolean schemaChanged;
+ final RecordPointer pointer;
+ boolean hasJoined = false;
+
+ private RecordBuffer(RecordPointer pointer, boolean schemaChanged) {
+ this.pointer = pointer;
+ this.schemaChanged = schemaChanged;
+ }
+
+ public void setHasJoined(boolean hasJoined) {
+ this.hasJoined = hasJoined;
+ }
+ }
+
+ abstract class JoinIterator implements RecordIterator {
+ protected List<RecordBuffer> buffer;
+ protected int curIdx = 0;
+ protected int bufferLength = 0;
+
+ protected abstract int setupBuffer();
+
+ protected int setupBufferForIterator(RecordIterator iterator) {
+ int count = 0;
+ NextOutcome outcome = iterator.next();
+ while (outcome != NextOutcome.NONE_LEFT) {
+ buffer.add(new RecordBuffer(
+ iterator.getRecordPointer().copy(),
+ outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED)
+ );
+ ++count;
+ outcome = iterator.next();
+ }
+ return count;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+ public NextOutcome next() {
+ if (buffer == null) {
+ buffer = Lists.newArrayList();
+ setupBuffer();
+ bufferLength = buffer.size();
+ }
+ return getNext();
+ }
+
+ public abstract NextOutcome getNext();
+
+ protected void setOutputRecord(RecordPointer... inputs) {
+ boolean first = true;
+ for(RecordPointer input : inputs) {
+ if(input == null) {
+ continue;
+ }
+
+ if(first) {
+ first = false;
+ record.copyFrom(input);
+ } else {
+ record.merge(input);
+ }
+ }
+ }
+
+ public boolean eval(DataValue leftVal, DataValue rightVal, String relationship) {
+ // Skip join if no comparison can be made
+ if (!ComparisonEvaluators.isComparable(leftVal, rightVal)) {
+ return false;
+ }
+
+ //Somehow utilize ComparisonEvaluators?
+ switch (relationship) {
+ case "!=":
+ return !leftVal.equals(rightVal);
+ case "==":
+ return leftVal.equals(rightVal);
+ case "<":
+ return ((ComparableValue) leftVal).compareTo(rightVal) < 0;
+ case "<=":
+ return ((ComparableValue) leftVal).compareTo(rightVal) <= 0;
+ case ">":
+ return ((ComparableValue) leftVal).compareTo(rightVal) > 0;
+ case ">=":
+ return ((ComparableValue) leftVal).compareTo(rightVal) >= 0;
+ default:
+ throw new DrillRuntimeException("Relationship not supported: " + relationship);
+ }
+ }
+
+ @Override
+ public ROP getParent() {
+ return JoinROP.this;
+ }
+ }
+
+ class InnerIterator extends JoinIterator {
+ NextOutcome rightOutcome;
+
+ @Override
+ protected int setupBuffer() {
+ return setupBufferForIterator(left);
+ }
+
+ @Override
+ public NextOutcome getNext() {
+ final RecordPointer rightPointer = right.getRecordPointer();
+ while (true) {
+ if (curIdx == 0) {
+ rightOutcome = right.next();
+
+ if (rightOutcome == NextOutcome.NONE_LEFT) {
+ break;
+ }
+ }
+
+ final RecordBuffer bufferObj = buffer.get(curIdx++);
+ Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
+ @Override
+ public boolean apply(JoinCondition condition) {
+ return eval(factory.getBasicEvaluator(rightPointer, condition.getRight()).eval(),
+ factory.getBasicEvaluator(bufferObj.pointer, condition.getLeft()).eval(), condition.getRelationship());
+ }
+ });
+
+ if (option.isPresent()) {
+ setOutputRecord(rightPointer, bufferObj.pointer);
+ return (bufferObj.schemaChanged || rightOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
+ NextOutcome.INCREMENTED_SCHEMA_CHANGED :
+ NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+ }
+
+ if (curIdx >= bufferLength) {
+ curIdx = 0;
+ }
+ }
+
+ return NextOutcome.NONE_LEFT;
+ }
+ }
+
+ class LeftIterator extends JoinIterator {
+ private NextOutcome leftOutcome;
+
+ @Override
+ protected int setupBuffer() {
+ return setupBufferForIterator(right);
+ }
+
+ @Override
+ public NextOutcome getNext() {
+ final RecordPointer leftPointer = left.getRecordPointer();
+ boolean isFound = true;
+ while (true) {
+ if (curIdx == 0) {
+ if (!isFound) {
+ setOutputRecord(leftPointer);
+ return leftOutcome;
+ }
+
+ leftOutcome = left.next();
+
+ if (leftOutcome == NextOutcome.NONE_LEFT) {
+ break;
+ }
+
+ isFound = false;
+ }
+
+ final RecordBuffer bufferObj = buffer.get(curIdx++);
+ Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
+ @Override
+ public boolean apply(JoinCondition condition) {
+ return eval(factory.getBasicEvaluator(leftPointer, condition.getLeft()).eval(),
+ factory.getBasicEvaluator(bufferObj.pointer, condition.getRight()).eval(), condition.getRelationship());
+ }
+ });
+
+ if (option.isPresent()) {
+ setOutputRecord(leftPointer, bufferObj.pointer);
+ return (bufferObj.schemaChanged || leftOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
+ NextOutcome.INCREMENTED_SCHEMA_CHANGED :
+ NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+ }
+
+ if (curIdx >= bufferLength) {
+ curIdx = 0;
+ }
+ }
+
+ return NextOutcome.NONE_LEFT;
+ }
+ }
+
+ class OuterIterator extends LeftIterator {
+ boolean innerJoinCompleted = false;
+
+ @Override
+ public NextOutcome getNext() {
+ if (innerJoinCompleted && curIdx >= bufferLength) {
+ return NextOutcome.NONE_LEFT;
+ }
+
+ if (!innerJoinCompleted) {
+ NextOutcome outcome = super.getNext();
+ if (outcome != NextOutcome.NONE_LEFT) {
+ return outcome;
+ } else {
+ innerJoinCompleted = true;
+ curIdx = 0;
+ }
+ }
+
+ if (innerJoinCompleted) {
+ while (curIdx < bufferLength) {
+ RecordBuffer recordBuffer = buffer.get(curIdx++);
+ if (!recordBuffer.hasJoined) {
+ setOutputRecord(recordBuffer.pointer, null);
+ return recordBuffer.schemaChanged ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+ }
+ }
+ }
+ return NextOutcome.NONE_LEFT;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/departments.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/departments.json b/sandbox/prototype/exec/ref/src/test/resources/departments.json
new file mode 100644
index 0000000..3cf0a85
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/departments.json
@@ -0,0 +1,16 @@
+{
+ "deptId": 31,
+ "name": "Sales"
+}
+{
+ "deptId": 33,
+ "name": "Engineering"
+}
+{
+ "deptId": 34,
+ "name": "Clerical"
+}
+{
+ "deptId": 35,
+ "name": "Marketing"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/employees.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/employees.json b/sandbox/prototype/exec/ref/src/test/resources/employees.json
new file mode 100644
index 0000000..83f68bd
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/employees.json
@@ -0,0 +1,23 @@
+{
+ "lastName": "Rafferty",
+ "deptId": 31
+}
+{
+ "lastName": "Jones",
+ "deptId": 33
+}
+{
+ "lastName": "Steinberg",
+ "deptId": 33
+}
+{
+ "lastName": "Robinson",
+ "deptId": 34
+}
+{
+ "lastName": "Smith",
+ "deptId": 34
+}
+{
+ "lastName": "John"
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/simple_join.json b/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
new file mode 100644
index 0000000..37e2a61
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
@@ -0,0 +1,70 @@
+{
+ head: {
+ type: "apache_drill_logical_plan",
+ version: "1",
+ generator: {
+ type: "manual",
+ info: "na"
+ }
+ },
+ storage:[
+ {
+ type:"console",
+ name:"console"
+ },
+ {
+ type:"fs",
+ name:"fs1",
+ root:"file:///"
+ },
+ {
+ type:"classpath",
+ name:"cp"
+ }
+ ],
+ query: [
+ {
+ @id: 1,
+ op: "scan",
+ memo: "initial_scan",
+ ref: "employees",
+ storageengine: "cp",
+ selection: {
+ path: "/employees.json",
+ type: "JSON"
+ }
+ },
+ {
+ @id: 2,
+ op: "scan",
+ memo: "second_scan",
+ ref: "departments",
+ storageengine: "cp",
+ selection: {
+ path: "/departments.json",
+ type: "JSON"
+ }
+ },
+ {
+ @id: 3,
+ op: "join",
+ left: 1,
+ right: 2,
+ type: "outer",
+ conditions: [
+ {
+ relationship: "==",
+ left: "employees.deptId",
+ right: "departments.deptId"
+ }
+ ]
+ },
+ {
+ input: 3,
+ op: "write",
+ memo: "output sink",
+ storageengine: "console",
+ target: {pipe: "STD_OUT"}
+ }
+ ]
+}
\ No newline at end of file