You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/02/25 04:40:53 UTC

[1/2] git commit: Implement Join ROP

Implement Join ROP


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/08bb3be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/08bb3be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/08bb3be6

Branch: refs/heads/master
Commit: 08bb3be62eb91fb8618bb7840a04db1b2608fbde
Parents: 6e4a138
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Feb 5 01:53:59 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Feb 24 19:36:24 2013 -0800

----------------------------------------------------------------------
 .../org/apache/drill/common/logical/data/Join.java |    4 +-
 .../drill/common/logical/data/LogicalOperator.java |    2 +-
 .../org/apache/drill/exec/ref/UnbackedRecord.java  |  144 ++++----
 .../exec/ref/eval/fn/ComparisonEvaluators.java     |    6 +-
 .../org/apache/drill/exec/ref/rops/JoinROP.java    |  309 +++++++++++++++
 .../exec/ref/src/test/resources/departments.json   |   16 +
 .../exec/ref/src/test/resources/employees.json     |   23 ++
 .../exec/ref/src/test/resources/simple_join.json   |   70 ++++
 8 files changed, 501 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
index 1ce0306..9742cd7 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,7 @@ public class Join extends LogicalOperatorBase {
   private final JoinCondition[] conditions;
 
   public static enum JoinType{
-    LEFT, RIGHT, INNER, OUTER;
+    LEFT, INNER, OUTER;
     
     public static JoinType resolve(String val){
       for(JoinType jt : JoinType.values()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
index 8644107..e2bda45 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index ada191d..bc6ae0e 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,77 +28,83 @@ import org.apache.drill.exec.ref.values.DataValue;
 import org.apache.drill.exec.ref.values.SimpleMapValue;
 import org.apache.drill.exec.ref.values.ValueUtils;
 
-public class UnbackedRecord implements RecordPointer{
-
-  private DataValue root = new SimpleMapValue();
-  
-  public DataValue getField(SchemaPath field) {
-    return root.getValue(field.getRootSegment());
-  }
-
-  public void addField(SchemaPath field, DataValue value) {
-    addField(field.getRootSegment(), value);
-  }
-
-  @Override
-  public void addField(PathSegment segment, DataValue value) {
-    root.addValue(segment, value);
-  }
-
-  @Override
-  public void removeField(SchemaPath field) {
-    root.removeValue(field.getRootSegment());
-  }
-  
-  @Override
-  public void write(DataWriter writer) throws IOException {
-    writer.startRecord();
-    root.write(writer);
-    writer.endRecord();
-  }
-
-  public void merge(DataValue v){
-    if(v instanceof ContainerValue){
-      this.root = ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, root, v);
-    }else{
-      this.root = v;
+public class UnbackedRecord implements RecordPointer {
+
+    private DataValue root = new SimpleMapValue();
+
+    public DataValue getField(SchemaPath field) {
+        return root.getValue(field.getRootSegment());
+    }
+
+    public void addField(SchemaPath field, DataValue value) {
+        addField(field.getRootSegment(), value);
     }
-  }
-
-  @Override
-  public RecordPointer copy() {
-    // TODO: Make a deep copy.
-    UnbackedRecord r = new UnbackedRecord();
-    r.root = this.root;
-    return r;
-  }
-  
-  public void clear(){
-    root = new SimpleMapValue();
-  }
- 
-  public void setClearAndSetRoot(SchemaPath path, DataValue v){
-    root = new SimpleMapValue();
-    root.addValue(path.getRootSegment(), v);
-  }
-
-  @Override
-  public void copyFrom(RecordPointer r) {
-    if(r instanceof UnbackedRecord){
-      this.root = ((UnbackedRecord)r).root.copy();
-    }else{
-      throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName()));
+
+    @Override
+    public void addField(PathSegment segment, DataValue value) {
+        root.addValue(segment, value);
     }
-  }
 
-  @Override
-  public String toString() {
-    return "UnbackedRecord [root=" + root + "]";
-  }
+    @Override
+    public void removeField(SchemaPath field) {
+        root.removeValue(field.getRootSegment());
+    }
 
+    @Override
+    public void write(DataWriter writer) throws IOException {
+        writer.startRecord();
+        root.write(writer);
+        writer.endRecord();
+    }
+
+    public void merge(DataValue v) {
+        if (v instanceof ContainerValue) {
+            this.root = ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, root, v);
+        } else {
+            this.root = v;
+        }
+    }
+
+    public void merge(RecordPointer pointer) {
+        if (pointer instanceof UnbackedRecord) {
+            merge(UnbackedRecord.class.cast(pointer).root);
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unable to merge from a record of type %s to an UnbackedRecord.", pointer.getClass().getCanonicalName())
+            );
+        }
+    }
+
+    @Override
+    public RecordPointer copy() {
+        // TODO: Make a deep copy.
+        UnbackedRecord r = new UnbackedRecord();
+        r.root = this.root;
+        return r;
+    }
+
+    public void clear() {
+        root = new SimpleMapValue();
+    }
+
+    public void setClearAndSetRoot(SchemaPath path, DataValue v) {
+        root = new SimpleMapValue();
+        root.addValue(path.getRootSegment(), v);
+    }
+
+    @Override
+    public void copyFrom(RecordPointer r) {
+        if (r instanceof UnbackedRecord) {
+            this.root = ((UnbackedRecord) r).root.copy();
+        } else {
+            throw new UnsupportedOperationException(String.format("Unable to copy from a record of type %s to an UnbackedRecord.", r.getClass().getCanonicalName()));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "UnbackedRecord [root=" + root + "]";
+    }
 
-  
 
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
index f3b2fff..3e6d428 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
@@ -45,6 +45,10 @@ public class ComparisonEvaluators {
     }
 
   }
+
+  public static boolean isComparable(DataValue a, DataValue b) {
+      return a instanceof ComparableValue && b instanceof ComparableValue && ((ComparableValue) a).supportsCompare(b);
+  }
   
   private abstract static class ComparisonEvaluator extends BaseBasicEvaluator{
     private final BasicEvaluator left;
@@ -63,7 +67,7 @@ public class ComparisonEvaluators {
       DataValue a = left.eval();
       DataValue b = right.eval();
       
-      if(a instanceof ComparableValue && b instanceof ComparableValue && ((ComparableValue) a).supportsCompare(b)){
+      if(isComparable(a, b)){
         int i = ((ComparableValue)a).compareTo(b);
         return new BooleanScalar(valid( i));
       }else{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
new file mode 100644
index 0000000..37c5657
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
@@ -0,0 +1,309 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.fn.ComparisonEvaluators;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.values.ComparableValue;
+import org.apache.drill.exec.ref.values.DataValue;
+
+import java.util.List;
+
+public class JoinROP extends ROPBase<Join> {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinROP.class);
+
+    private RecordIterator left;
+    private RecordIterator right;
+    private UnbackedRecord record;
+    private EvaluatorFactory factory;
+
+    public JoinROP(Join config) {
+        super(config);
+        record = new UnbackedRecord();
+    }
+
+    @Override
+    protected void setupIterators(IteratorRegistry builder) {
+        left = Iterables.getOnlyElement(builder.getOperator(config.getLeft()));
+        right = Iterables.getOnlyElement(builder.getOperator(config.getRight()));
+    }
+
+    @Override
+    protected void setupEvals(EvaluatorFactory builder) throws SetupException {
+        factory = builder;
+    }
+
+    @Override
+    protected RecordIterator getIteratorInternal() {
+        return createIteratorFromJoin(config.getJointType());
+    }
+
+    private RecordIterator createIteratorFromJoin(Join.JoinType type) {
+        switch (type) {
+            case LEFT:
+                return new LeftIterator();
+            case INNER:
+                return new InnerIterator();
+            case OUTER:
+                return new OuterIterator();
+            default:
+                throw new UnsupportedOperationException("Type not supported: " + type);
+        }
+    }
+
+    private class RecordBuffer {
+        final boolean schemaChanged;
+        final RecordPointer pointer;
+        boolean hasJoined = false;
+
+        private RecordBuffer(RecordPointer pointer, boolean schemaChanged) {
+            this.pointer = pointer;
+            this.schemaChanged = schemaChanged;
+        }
+
+        public void setHasJoined(boolean hasJoined) {
+            this.hasJoined = hasJoined;
+        }
+    }
+
+    abstract class JoinIterator implements RecordIterator {
+        protected List<RecordBuffer> buffer;
+        protected int curIdx = 0;
+        protected int bufferLength = 0;
+
+        protected abstract int setupBuffer();
+
+        protected int setupBufferForIterator(RecordIterator iterator) {
+            int count = 0;
+            NextOutcome outcome = iterator.next();
+            while (outcome != NextOutcome.NONE_LEFT) {
+                buffer.add(new RecordBuffer(
+                        iterator.getRecordPointer().copy(),
+                        outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED)
+                );
+                ++count;
+                outcome = iterator.next();
+            }
+            return count;
+        }
+
+        @Override
+        public RecordPointer getRecordPointer() {
+            return record;
+        }
+
+        public NextOutcome next() {
+            if (buffer == null) {
+                buffer = Lists.newArrayList();
+                setupBuffer();
+                bufferLength = buffer.size();
+            }
+            return getNext();
+        }
+
+        public abstract NextOutcome getNext();
+
+        protected void setOutputRecord(RecordPointer... inputs) {
+            boolean first = true;
+            for(RecordPointer input : inputs) {
+                if(input == null) {
+                    continue;
+                }
+
+                if(first) {
+                    first = false;
+                    record.copyFrom(input);
+                } else {
+                    record.merge(input);
+                }
+            }
+        }
+
+        public boolean eval(DataValue leftVal, DataValue rightVal, String relationship) {
+            // Skip join if no comparison can be made
+            if (!ComparisonEvaluators.isComparable(leftVal, rightVal)) {
+                return false;
+            }
+
+            //Somehow utilize ComparisonEvaluators?
+            switch (relationship) {
+                case "!=":
+                    return !leftVal.equals(rightVal);
+                case "==":
+                    return leftVal.equals(rightVal);
+                case "<":
+                    return ((ComparableValue) leftVal).compareTo(rightVal) < 0;
+                case "<=":
+                    return ((ComparableValue) leftVal).compareTo(rightVal) <= 0;
+                case ">":
+                    return ((ComparableValue) leftVal).compareTo(rightVal) > 0;
+                case ">=":
+                    return ((ComparableValue) leftVal).compareTo(rightVal) >= 0;
+                default:
+                    throw new DrillRuntimeException("Relationship not supported: " + relationship);
+            }
+        }
+
+        @Override
+        public ROP getParent() {
+            return JoinROP.this;
+        }
+    }
+
+    class InnerIterator extends JoinIterator {
+        NextOutcome rightOutcome;
+
+        @Override
+        protected int setupBuffer() {
+            return setupBufferForIterator(left);
+        }
+
+        @Override
+        public NextOutcome getNext() {
+            final RecordPointer rightPointer = right.getRecordPointer();
+            while (true) {
+                if (curIdx == 0) {
+                    rightOutcome = right.next();
+
+                    if (rightOutcome == NextOutcome.NONE_LEFT) {
+                        break;
+                    }
+                }
+
+                final RecordBuffer bufferObj = buffer.get(curIdx++);
+                Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
+                    @Override
+                    public boolean apply(JoinCondition condition) {
+                        return eval(factory.getBasicEvaluator(rightPointer, condition.getRight()).eval(),
+                                factory.getBasicEvaluator(bufferObj.pointer, condition.getLeft()).eval(), condition.getRelationship());
+                    }
+                });
+
+                if (option.isPresent()) {
+                    setOutputRecord(rightPointer, bufferObj.pointer);
+                    return (bufferObj.schemaChanged || rightOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
+                            NextOutcome.INCREMENTED_SCHEMA_CHANGED :
+                            NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+                }
+
+                if (curIdx >= bufferLength) {
+                    curIdx = 0;
+                }
+            }
+
+            return NextOutcome.NONE_LEFT;
+        }
+    }
+
+    class LeftIterator extends JoinIterator {
+        private NextOutcome leftOutcome;
+
+        @Override
+        protected int setupBuffer() {
+            return setupBufferForIterator(right);
+        }
+
+        @Override
+        public NextOutcome getNext() {
+            final RecordPointer leftPointer = left.getRecordPointer();
+            boolean isFound = true;
+            while (true) {
+                if (curIdx == 0) {
+                    if (!isFound) {
+                        setOutputRecord(leftPointer);
+                        return leftOutcome;
+                    }
+
+                    leftOutcome = left.next();
+
+                    if (leftOutcome == NextOutcome.NONE_LEFT) {
+                        break;
+                    }
+
+                    isFound = false;
+                }
+
+                final RecordBuffer bufferObj = buffer.get(curIdx++);
+                Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
+                    @Override
+                    public boolean apply(JoinCondition condition) {
+                        return eval(factory.getBasicEvaluator(leftPointer, condition.getLeft()).eval(),
+                                factory.getBasicEvaluator(bufferObj.pointer, condition.getRight()).eval(), condition.getRelationship());
+                    }
+                });
+
+                if (option.isPresent()) {
+                    setOutputRecord(leftPointer, bufferObj.pointer);
+                    return (bufferObj.schemaChanged || leftOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
+                            NextOutcome.INCREMENTED_SCHEMA_CHANGED :
+                            NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+                }
+
+                if (curIdx >= bufferLength) {
+                    curIdx = 0;
+                }
+            }
+
+            return NextOutcome.NONE_LEFT;
+        }
+    }
+
+    class OuterIterator extends LeftIterator {
+        boolean innerJoinCompleted = false;
+
+        @Override
+        public NextOutcome getNext() {
+            if (innerJoinCompleted && curIdx >= bufferLength) {
+                return NextOutcome.NONE_LEFT;
+            }
+
+            if (!innerJoinCompleted) {
+                NextOutcome outcome = super.getNext();
+                if (outcome != NextOutcome.NONE_LEFT) {
+                    return outcome;
+                } else {
+                    innerJoinCompleted = true;
+                    curIdx = 0;
+                }
+            }
+
+            if (innerJoinCompleted) {
+                while (curIdx < bufferLength) {
+                    RecordBuffer recordBuffer = buffer.get(curIdx++);
+                    if (!recordBuffer.hasJoined) {
+                        setOutputRecord(recordBuffer.pointer, null);
+                        return recordBuffer.schemaChanged ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+                    }
+                }
+            }
+            return NextOutcome.NONE_LEFT;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/departments.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/departments.json b/sandbox/prototype/exec/ref/src/test/resources/departments.json
new file mode 100644
index 0000000..3cf0a85
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/departments.json
@@ -0,0 +1,16 @@
+{
+    "deptId": 31,
+    "name": "Sales"
+}
+{
+    "deptId": 33,
+    "name": "Engineering"
+}
+{
+    "deptId": 34,
+    "name": "Clerical"
+}
+{
+    "deptId": 35,
+    "name": "Marketing"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/employees.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/employees.json b/sandbox/prototype/exec/ref/src/test/resources/employees.json
new file mode 100644
index 0000000..83f68bd
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/employees.json
@@ -0,0 +1,23 @@
+{
+    "lastName": "Rafferty",
+    "deptId": 31
+}
+{
+    "lastName": "Jones",
+    "deptId": 33
+}
+{
+    "lastName": "Steinberg",
+    "deptId": 33
+}
+{
+    "lastName": "Robinson",
+    "deptId": 34
+}
+{
+    "lastName": "Smith",
+    "deptId": 34
+}
+{
+    "lastName": "John"
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08bb3be6/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/simple_join.json b/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
new file mode 100644
index 0000000..37e2a61
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/simple_join.json
@@ -0,0 +1,70 @@
+{
+   head: {
+      type: "apache_drill_logical_plan",
+      version: "1",
+      generator: {
+         type: "manual",
+         info: "na"
+      }
+   },
+   storage:[
+       {
+         type:"console",
+         name:"console"
+       },
+       {
+         type:"fs",
+         name:"fs1",
+         root:"file:///"
+       },
+       {
+         type:"classpath",
+         name:"cp"
+       }
+   ],
+   query: [
+      {
+         @id: 1,
+         op: "scan",
+         memo: "initial_scan",
+         ref: "employees",
+         storageengine: "cp",
+         selection: {
+         	 path: "/employees.json",
+         	 type: "JSON"
+         }
+      },
+      {
+         @id: 2,
+         op: "scan",
+         memo: "second_scan",
+         ref: "departments",
+         storageengine: "cp",
+         selection: {
+             path: "/departments.json",
+             type: "JSON"
+         }
+      },
+      {
+         @id: 3,
+         op: "join",
+         left: 1,
+         right: 2,
+         type: "outer",
+         conditions: [
+            {
+               relationship: "==",
+               left: "employees.deptId",
+               right: "departments.deptId"
+            }
+         ]
+      },
+      {
+         input: 3,
+         op: "write",
+         memo: "output sink",
+         storageengine: "console",
+         target: {pipe: "STD_OUT"}
+      }
+   ]
+}
\ No newline at end of file