You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by td...@apache.org on 2012/10/21 21:55:21 UTC

[3/3] git commit: DRILL-5 - First "working" version of Explode/Implode

DRILL-5 - First "working" version of Explode/Implode


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/be2ce8d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/be2ce8d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/be2ce8d2

Branch: refs/heads/master
Commit: be2ce8d2d404048304dc1c4392319380d3f00cb4
Parents: 62cebad
Author: tdunning <td...@apache.org>
Authored: Fri Oct 19 17:25:17 2012 -0700
Committer: tdunning <td...@apache.org>
Committed: Fri Oct 19 17:25:17 2012 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/drill/plan/ParsePlan.java |   43 +++++--
 .../org/apache/drill/plan/PhysicalInterpreter.java |   85 ++++++++++++--
 .../plan/physical/operators/ArithmeticOp.java      |    7 +-
 .../plan/physical/operators/BatchListener.java     |   11 ++
 .../apache/drill/plan/physical/operators/Bind.java |    6 +-
 .../plan/physical/operators/EvalOperator.java      |   13 ++
 .../drill/plan/physical/operators/Explode.java     |   61 +++++++++-
 .../drill/plan/physical/operators/Implode.java     |   63 +++++++++++
 .../drill/plan/physical/operators/JsonSchema.java  |   87 +++++++++++++++
 .../drill/plan/physical/operators/Operator.java    |   75 +++++++++----
 .../drill/plan/physical/operators/ScanJson.java    |   35 +-----
 .../drill/plan/physical/operators/Schema.java      |   10 ++-
 .../java/org/apache/drill/plan/ParsePlanTest.java  |   12 +-
 .../apache/drill/plan/PhysicalInterpreterTest.java |   18 ++-
 sandbox/plan-parser/src/test/resources/data1.json  |    1 +
 sandbox/plan-parser/src/test/resources/data2.json  |    2 +
 .../src/test/resources/physical-2.drillx           |    7 +
 17 files changed, 431 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
index 781bc62..1d7230d 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
@@ -35,28 +35,37 @@ import java.util.Formatter;
 
 /**
  * Parses a plan from a resource or file.
- *
+ * <p/>
  * The result is validated to ensure that symbols mentioned on the left-hand side of assignments are only mentioned
  * once and all referenced symbols on the right hand side are defined somewhere.
  */
 public class ParsePlan {
-    public static Plan parseResource(File file) throws IOException, RecognitionException, ValidationException {
+    public static Plan parseResource(File file) throws ParseException {
         return ParsePlan.parse(Files.newReaderSupplier(file, Charsets.UTF_8));
     }
 
-    public static Plan parseResource(String resourceName) throws IOException, RecognitionException, ValidationException {
+    public static Plan parseResource(String resourceName) throws ParseException {
         return ParsePlan.parse(Resources.newReaderSupplier(Resources.getResource(resourceName), Charsets.UTF_8));
     }
 
-    public static Plan parse(InputSupplier<InputStreamReader> in) throws IOException, RecognitionException, ValidationException {
-        InputStreamReader inStream = in.getInput();
-        PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream));
-        PlanParser r = new PlanParser(new CommonTokenStream(lex));
-        inStream.close();
+    public static Plan parse(InputSupplier<InputStreamReader> in) throws ParseException {
+        PlanParser r;
+        try {
+            InputStreamReader inStream = in.getInput();
+            PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream));
+            r = new PlanParser(new CommonTokenStream(lex));
+            inStream.close();
+        } catch (IOException e) {
+            throw new ParseException(e);
+        }
 
-        Plan plan = r.plan().r;
-        validate(plan);
-        return plan;
+        try {
+            Plan plan = r.plan().r;
+            validate(plan);
+            return plan;
+        } catch (RecognitionException e) {
+            throw new ParseException(e);
+        }
     }
 
     private static void validate(Plan r) throws ValidationException {
@@ -98,7 +107,17 @@ public class ParsePlan {
         }
     }
 
-    public static class ValidationException extends Exception {
+    public static class ParseException extends Exception {
+        public ParseException(Throwable throwable) {
+            super(throwable);
+        }
+
+        public ParseException(String s) {
+            super(s);
+        }
+    }
+
+    public static class ValidationException extends ParseException {
         public ValidationException(String s) {
             super(s);
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
index 3058206..778ac90 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
@@ -17,6 +17,9 @@
 
 package org.apache.drill.plan;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.drill.plan.ast.Arg;
@@ -25,15 +28,14 @@ import org.apache.drill.plan.ast.Plan;
 import org.apache.drill.plan.physical.operators.DataListener;
 import org.apache.drill.plan.physical.operators.Operator;
 
+import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 /**
  * Takes a physical plan and interprets in locally.  The goal here is to provide a reference
@@ -42,12 +44,22 @@ import java.util.concurrent.Future;
 public class PhysicalInterpreter implements DataListener {
     private final List<Operator> ops;
 
-    public PhysicalInterpreter(Plan prog) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
+    public PhysicalInterpreter(Plan prog) throws SetupException {
         Map<Integer, Operator> bindings = Maps.newHashMap();
         ops = Lists.newArrayList();
 
-        for (Op op : prog.getStatements()) {
-            ops.add(Operator.create(op, bindings));
+        try {
+            for (Op op : prog.getStatements()) {
+                ops.add(Operator.create(op, bindings));
+            }
+        } catch (NoSuchMethodException e) {
+            throw new SetupException(e);
+        } catch (InvocationTargetException e) {
+            throw new SetupException(e);
+        } catch (IllegalAccessException e) {
+            throw new SetupException(e);
+        } catch (InstantiationException e) {
+            throw new SetupException(e);
         }
 
         Iterator<Op> i = prog.getStatements().iterator();
@@ -61,13 +73,46 @@ public class PhysicalInterpreter implements DataListener {
         }
     }
 
-    public void run() throws InterruptedException, ExecutionException {
+    public void run() throws QueryException {
         ExecutorService pool = Executors.newFixedThreadPool(ops.size());
-        List<Future<Object>> tasks = pool.invokeAll(ops);
+
+        // pick out the ops that are tasks
+        List<Callable<Object>> tasks = Lists.newArrayList(Iterables.transform(Iterables.filter(ops, new Predicate<Operator>() {
+            @Override
+            public boolean apply(@Nullable Operator operator) {
+                return operator instanceof Callable;
+            }
+        }), new Function<Operator, Callable<Object>>() {
+            @Override
+            public Callable<Object> apply(@Nullable Operator operator) {
+                // cast is safe due to previous filter
+                return (Callable<Object>) operator;
+            }
+        }));
+
+        List<Future<Object>> results;
+        try {
+            results = pool.invokeAll(tasks);
+        } catch (InterruptedException e) {
+            throw new QueryException(e);
+        }
         pool.shutdown();
 
-        for (Future<Object> task : tasks) {
-            System.out.printf("%s\n", task.get());
+        for (Operator op : ops) {
+            if (op instanceof Closeable) {
+                op.close();
+            }
+        }
+
+        try {
+            Iterator<Callable<Object>> i = tasks.iterator();
+            for (Future<Object> result : results) {
+                System.out.printf("%s => %s\n", i.next(), result.get());
+            }
+        } catch (InterruptedException e) {
+            throw new QueryException(e);
+        } catch (ExecutionException e) {
+            throw new QueryException(e);
         }
     }
 
@@ -76,4 +121,22 @@ public class PhysicalInterpreter implements DataListener {
     public void notify(Object r) {
         System.out.printf("out = %s\n", r);
     }
+
+    public static class InterpreterException extends Exception {
+        private InterpreterException(Throwable throwable) {
+            super(throwable);
+        }
+    }
+
+    public static class SetupException extends InterpreterException {
+        private SetupException(Throwable throwable) {
+            super(throwable);
+        }
+    }
+
+    public static class QueryException extends InterpreterException {
+        private QueryException(Throwable throwable) {
+            super(throwable);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
index 516feff..fb65c10 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
@@ -40,10 +40,7 @@ public abstract class ArithmeticOp extends EvalOperator {
     public EvalOperator left, right;
 
     public ArithmeticOp(Op op, Map<Integer, Operator> bindings) {
-        checkArity(op, 2, 1);
-
-        // bind our output
-        bindings.put(op.getOutputs().get(0).asSymbol().getInt(), this);
+        super(op, bindings, 2, 1);
     }
 
     @Override
@@ -64,8 +61,6 @@ public abstract class ArithmeticOp extends EvalOperator {
 
     @Override
     public void link(Op op, Map<Integer, Operator> bindings) {
-        checkArity(op, 2, 1);
-
         List<Arg> in = op.getInputs();
         left = extractOperand(in.get(0), bindings);
         right = extractOperand(in.get(1), bindings);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
new file mode 100644
index 0000000..a103ffc
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
@@ -0,0 +1,11 @@
+package org.apache.drill.plan.physical.operators;
+
+/**
+ * Aggregate and Implode operators need to know when batches of records are finished and thus
+ * they implement BatchListener.  Note that the source of data is different from the source of
+ * batch boundaries. This avoids the need for every data processor to propagate boundaries but
+ * it also allows fancy structures to be constructed to do interesting things.
+ */
+public interface BatchListener {
+    public void endBatch(Object parent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
index 329272e..4bd2b73 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
@@ -17,10 +17,8 @@
 
 package org.apache.drill.plan.physical.operators;
 
-import org.apache.drill.plan.ast.Arg;
 import org.apache.drill.plan.ast.Op;
 
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -36,9 +34,7 @@ public class Bind extends EvalOperator {
     private String name;
 
     public Bind(Op op, Map<Integer, Operator> bindings) {
-        checkArity(op, 2, 1);
-        List<Arg> out = op.getOutputs();
-        bindings.put(out.get(0).asSymbol().getInt(), this);
+        super(op, bindings, 2, 1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
index dfaea7b..d02cafe 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
@@ -17,10 +17,23 @@
 
 package org.apache.drill.plan.physical.operators;
 
+import org.apache.drill.plan.ast.Op;
+
+import java.util.Map;
+
 /**
  * Describes a scalar expression.
  */
 public abstract class EvalOperator extends Operator {
+    public EvalOperator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) {
+        super(op, bindings, inputArgs, outputArgs);
+    }
+
+    // only for Constants
+    protected EvalOperator() {
+        super();
+    }
+
     public abstract Object eval(Object data);
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
index a083353..74bec49 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
@@ -1,11 +1,60 @@
 package org.apache.drill.plan.physical.operators;
 
+import org.apache.drill.plan.ast.Arg;
+import org.apache.drill.plan.ast.Op;
+
+import java.util.List;
+import java.util.Map;
+
 /**
- * Created with IntelliJ IDEA.
- * User: tdunning
- * Date: 10/17/12
- * Time: 6:52 PM
- * To change this template use File | Settings | File Templates.
+ * Explode creates a secondary data flow that contains a batch of records for
+ * each input record.  Typically, Explode is paired with Implode which gathers
+ * the results back together or with Aggregate which gathers results in a different
+ * way.  Cogroup, Explode, Aggregate gives a group by aggregate and
+ * Explode, Filter, Implode gives sub-tree filtering.
+ *
+ * Explode has two outputs.  One is a data source and one is a batch controller (source of batch
+ * boundaries).  Any downstream operator that delays emitting records should
+ * listen to the batch boundaries to avoid having aggregate outputs that cross batch
+ * boundaries.
  */
-public class Explode {
+public class Explode extends Operator implements DataListener {
+    private String variableToExplode;
+    private Schema schema;
+    private Schema subSchema;
+
+    public static void define() {
+        Operator.defineOperator("explode", Explode.class);
+    }
+
+    public Explode(Op op, Map<Integer, Operator> bindings) {
+        // exploded-data-stream, batch-controller := explode data-in, variable-name-to-explode
+        super(op, bindings, 2, 1);
+    }
+
+    @Override
+    public void link(Op op, Map<Integer, Operator> bindings) {
+        List<Arg> in = op.getInputs();
+        Operator data = bindings.get(in.get(0).asSymbol().getInt());
+        data.addDataListener(this);
+        schema = data.getSchema();
+
+        variableToExplode = in.get(1).asString();
+        subSchema = schema.getSubSchema(variableToExplode);
+    }
+
+    @Override
+    public void notify(Object row) {
+        // for each input we get, we iterate through our exploding variable
+        for (Object value : schema.getIterable(variableToExplode, row)) {
+            emit(value);
+        }
+        finishBatch(row);
+    }
+
+    @Override
+    public Schema getSchema() {
+        return subSchema;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
new file mode 100644
index 0000000..15f0f5b
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
@@ -0,0 +1,63 @@
+package org.apache.drill.plan.physical.operators;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.plan.ast.Arg;
+import org.apache.drill.plan.ast.Op;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Aggregate records into a list that is inserted into the parent record for the batch.
+ */
+public class Implode extends Operator implements DataListener, BatchListener {
+    private Operator data;
+    private List<Object> accumulator = Lists.newArrayList();
+    private Schema schema   ;
+    private String accumulatorVar;
+
+    public static void define() {
+        Operator.defineOperator("implode", Implode.class);
+    }
+
+    public Implode(Op op, Map<Integer, Operator> bindings) {
+        // data-out, implode-var := implode data-in, batch-master
+        super(op, bindings, 2, 2);
+    }
+
+    @Override
+    public void link(Op op, Map<Integer, Operator> bindings) {
+        List<Arg> in = op.getInputs();
+        data = bindings.get(in.get(0).asSymbol().getInt());
+        data.addDataListener(this);
+
+//        schema = data.getSchema().overlay();
+        schema = new JsonSchema();
+        Operator batchController = bindings.get(in.get(1).asSymbol().getInt());
+        batchController.addBatchListener(this);
+
+        accumulatorVar = Operator.gensym();
+    }
+
+    @Override
+    public Object eval() {
+        return accumulatorVar;
+    }
+
+    @Override
+    public Schema getSchema() {
+        throw new UnsupportedOperationException("Default operation");
+    }
+
+    @Override
+    public void notify(Object r) {
+        accumulator.add(r);
+    }
+
+    @Override
+    public void endBatch(Object parent) {
+        schema.set(accumulatorVar, parent, accumulator);
+        accumulator = Lists.newArrayList();
+        emit(parent);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
new file mode 100644
index 0000000..19b750a
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
@@ -0,0 +1,87 @@
+package org.apache.drill.plan.physical.operators;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.gson.*;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+* Created with IntelliJ IDEA.
+* User: tdunning
+* Date: 10/19/12
+* Time: 5:01 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonSchema extends Schema {
+    Splitter onDot = Splitter.on(".");
+
+    @Override
+    public Object get(String name, Object data) {
+        JsonElement r = (JsonElement) data;
+        Iterable<String> bits = onDot.split(name);
+        for (String bit : bits) {
+            r = ((JsonObject) data).get(bit);
+        }
+        return cleanupJsonisms(r);
+    }
+
+    @Override
+    public <T> Iterable<T> getIterable(String name, Object data) {
+        Object r = get(name, data);
+        if (r instanceof JsonArray) {
+            return Iterables.transform((JsonArray) r, new Function<JsonElement, T>() {
+                @Override
+                public T apply(@Nullable JsonElement jsonElement) {
+                    return (T) cleanupJsonisms(jsonElement);
+                }
+            });
+        } else {
+            throw new IllegalArgumentException("Looked for array but value was " + r.getClass());
+        }
+    }
+
+    @Override
+    public Schema getSubSchema(String name) {
+        return new JsonSchema();
+    }
+
+    @Override
+    public Schema overlay() {
+        return this;
+    }
+
+    @Override
+    public void set(String name, Object parent, Object value) {
+        if (value instanceof Collection) {
+            // input is likely to have been collected by Implode
+            // TODO but what if something else built this?  Do we need a general serialization framework?
+            JsonArray r = new JsonArray();
+            for (Object v : ((Collection) value)) {
+                r.add((JsonElement) v);
+            }
+            ((JsonObject) parent).add(name, r);
+        } else if (value instanceof Number) {
+            ((JsonObject) parent).add(name, new JsonPrimitive((Number) value));
+        } else {
+            throw new IllegalArgumentException(String.format("Can't convert a %s to JSON by magic", value.getClass()));
+        }
+    }
+
+    private Object cleanupJsonisms(JsonElement data) {
+        if (data instanceof JsonPrimitive) {
+            JsonPrimitive v = (JsonPrimitive) data;
+            if (v.isNumber()) {
+                return v.getAsDouble();
+            } else if (v.isString()) {
+                return v.getAsString();
+            } else {
+                return v.getAsBoolean();
+            }
+        } else {
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
index d34ff9c..9c6c0cb 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
@@ -26,34 +26,48 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Implements a function for an operator on a single line of the physical plan.
- *
+ * <p/>
  * The life cycle of an operator is
  * <nl>
- *     <li>The operator's constructor is defined using Operator.defineOperator</li>
- *     <li>The operator is constructed via Operator.create.  It is expected that
- *     the operator will fill in references to it's own outputs into the DAG bindings</li>
- *     <li>The operator is linked by a call to its link() method.  At this point, the
- *     operator can look at its arguments and resolve references to its inputs.
- *     This is when it should add itself as a data listener and when it should request
- *     any schema that it needs from upstream Operator's.</li>
- *     <li>The operator's run() method is called.  Most operators should simply return at this
- *     point, but data sources should start calling emit with data records.</li>
- *     <li>The operator will be notified of incoming data.  It should process this data
- *     and emit the result.</li>
+ * <li>The operator's constructor is defined using Operator.defineOperator</li>
+ * <li>The operator is constructed via Operator.create.  It is expected that
+ * the operator will fill in references to it's own outputs into the DAG bindings</li>
+ * <li>The operator is linked by a call to its link() method.  At this point, the
+ * operator can look at its arguments and resolve references to its inputs.
+ * This is when it should add itself as a data listener and when it should request
+ * any schema that it needs from upstream Operator's.</li>
+ * <li>The operator's run() method is called.  Most operators should simply return at this
+ * point, but data sources should start calling emit with data records.</li>
+ * <li>The operator will be notified of incoming data.  It should process this data
+ * and emit the result.</li>
  * </nl>
  */
-public abstract class Operator implements Callable<Object> {
+public abstract class Operator {
+    private static AtomicInteger genCount = new AtomicInteger(0);
     private static final Map<String, Class<? extends Operator>> operatorMap = Maps.newHashMap();
 
+    public Operator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) {
+        checkArity(op, inputArgs, outputArgs);
+        for (Arg arg : op.getOutputs()) {
+            bindings.put(arg.asSymbol().getInt(), this);
+        }
+    }
+
+    // only for testing and constants
+    protected Operator() {
+    }
+
     static {
         ArithmeticOp.define();
         Bind.define();
         Filter.define();
         ScanJson.define();
+        Explode.define();
+        Implode.define();
     }
 
 
@@ -67,34 +81,53 @@ public abstract class Operator implements Callable<Object> {
     public static Operator create(Op op, Map<Integer, Operator> bindings) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException {
         Class<? extends Operator> c = operatorMap.get(op.getOp());
         if (c == null) {
-            throw new IllegalArgumentException(String.format("No such operators as %s", op.getOp()));
+            throw new IllegalArgumentException(String.format("No such operator as %s", op.getOp()));
         }
 
         Constructor<? extends Operator> con = c.getConstructor(Op.class, Map.class);
         return con.newInstance(op, bindings);
     }
 
-    protected final List<DataListener> dataOut = Lists.newArrayList();
+
+    public static String gensym() {
+        return String.format("__sym-%d", genCount.incrementAndGet());
+    }
+
+    private final List<DataListener> dataOut = Lists.newArrayList();
+    private List<BatchListener> batchOut = Lists.newArrayList();
 
     public void addDataListener(DataListener listener) {
         this.dataOut.add(listener);
     }
 
+    public void addBatchListener(BatchListener listener) {
+        this.batchOut.add(listener);
+    }
+
     protected void emit(Object r) {
         for (DataListener listener : dataOut) {
             listener.notify(r);
         }
     }
 
-    public double eval() {
+    protected void finishBatch(Object parent) {
+        for (BatchListener listener : batchOut) {
+            listener.endBatch(parent);
+        }
+    }
+
+    public double evalAsDouble() {
         throw new UnsupportedOperationException("default no can do");  //To change body of created methods use File | Settings | File Templates.
     }
 
+    public Object eval() {
+        return null;
+    }
+
     public abstract void link(Op op, Map<Integer, Operator> bindings);
 
-    public Object call() throws Exception {
-        // do nothing
-        return null;
+    public void close() {
+        // do nothing by default... over-ride for clever behavior
     }
 
     public abstract Schema getSchema();
@@ -107,7 +140,7 @@ public abstract class Operator implements Callable<Object> {
 
         List<Arg> out = op.getOutputs();
         if (out.size() != outputArgs) {
-            throw new IllegalArgumentException("bind should have exactly one output");
+            throw new IllegalArgumentException(String.format("Operator should have exactly %d outputs, not %d", outputArgs, out.size()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
index 2e7155d..0b699c3 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
@@ -18,14 +18,10 @@
 package org.apache.drill.plan.physical.operators;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Splitter;
 import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
 import com.google.common.io.Resources;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.JsonStreamParser;
+import com.google.gson.*;
 import org.apache.drill.plan.ast.Arg;
 import org.apache.drill.plan.ast.Op;
 
@@ -35,11 +31,12 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * Reads JSON formatted records from a file.
  */
-public class ScanJson extends Operator {
+public class ScanJson extends Operator implements Callable<Object> {
 
     public static void define() {
         Operator.defineOperator("scan-json", ScanJson.class);
@@ -90,6 +87,9 @@ public class ScanJson extends Operator {
             count++;
         }
         in.close();
+        // TODO what should the parent record be at the top-level?
+        finishBatch(null);
+
         return count;
     }
 
@@ -98,27 +98,4 @@ public class ScanJson extends Operator {
         return new JsonSchema();
     }
 
-    private class JsonSchema extends Schema {
-        Splitter onDot = Splitter.on(".");
-
-        @Override
-        public Object get(String name, Object data) {
-            Iterable<String> bits = onDot.split(name);
-            for (String bit : bits) {
-                data = ((JsonObject) data).get(bit);
-            }
-            if (data instanceof JsonPrimitive) {
-                JsonPrimitive v = (JsonPrimitive) data;
-                if (v.isNumber()) {
-                    return v.getAsDouble();
-                } else if (v.isString()) {
-                    return v.getAsString();
-                } else {
-                    return v.getAsBoolean();
-                }
-            } else {
-                return data;
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
index e38a792..424c715 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
@@ -19,9 +19,17 @@ package org.apache.drill.plan.physical.operators;
 
 /**
  * Describes a schema.  In this context a schema is what understands how to get data
- * out of a record.  For JSON, the schema is pretty dumb, but for other data types it
+ * into and out of a record.  For JSON, the schema is pretty dumb, but for other data types it
  * could be quite clever.
  */
 public abstract class Schema {
     public abstract Object get(String name, Object data);
+
+    public abstract <T> Iterable<? extends T> getIterable(String name, Object data);
+
+    public abstract Schema getSubSchema(String name);
+
+    public abstract Schema overlay();
+
+    public abstract void set(String name, Object parent, Object value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
index 7bbbaf6..7c4d6c4 100644
--- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
+++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import org.antlr.runtime.ANTLRReaderStream;
 import org.antlr.runtime.MissingTokenException;
-import org.antlr.runtime.RecognitionException;
 import org.antlr.runtime.Token;
 import org.apache.drill.plan.ast.LogicalPlanParseException;
 import org.apache.drill.plan.ast.Plan;
@@ -38,25 +37,25 @@ import static junit.framework.Assert.*;
 
 public class ParsePlanTest {
     @Test
-    public void testParse1() throws IOException, RecognitionException, ParsePlan.ValidationException {
+    public void testParse1() throws ParsePlan.ParseException {
         Plan r = ParsePlan.parseResource("plan1.drillx");
         assertEquals("Lines", 3, r.getStatements().size());
     }
 
     @Test
-    public void testParse2() throws IOException, RecognitionException, ParsePlan.ValidationException {
+    public void testParse2() throws ParsePlan.ParseException {
         Plan r = ParsePlan.parseResource("plan2.drillx");
         assertEquals("Lines", 6, r.getStatements().size());
     }
 
     @Test
-    public void testParse3() throws IOException, RecognitionException, ParsePlan.ValidationException {
+    public void testParse3() throws ParsePlan.ParseException {
         Plan r = ParsePlan.parseResource("plan3.drillx");
         assertEquals("Lines", 8, r.getStatements().size());
     }
 
     @Test
-    public void testParseError1() throws IOException, RecognitionException, ParsePlan.ValidationException {
+    public void testParseError1() throws ParsePlan.ParseException {
         try {
             ParsePlan.parseResource("bad-plan1.drillx");
             fail("Should have thrown exception");
@@ -67,7 +66,7 @@ public class ParsePlanTest {
     }
 
     @Test
-    public void testParseError2() throws IOException, RecognitionException, ParsePlan.ValidationException {
+    public void testParseError2() throws ParsePlan.ParseException {
         try {
             ParsePlan.parseResource("bad-plan2.drillx");
             fail("Should have thrown exception");
@@ -80,7 +79,6 @@ public class ParsePlanTest {
     }
 
 
-
     @Test
     public void testLexer() throws IOException {
         List<String> ref = Lists.newArrayList(

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
index 6d401c1..77a9e57 100644
--- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
+++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
@@ -17,18 +17,22 @@
 
 package org.apache.drill.plan;
 
-import org.antlr.runtime.RecognitionException;
 import org.apache.drill.plan.ast.Plan;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.concurrent.ExecutionException;
-
 public class PhysicalInterpreterTest {
     @Test
-    public void testTrivialPlan() throws ParsePlan.ValidationException, RecognitionException, IOException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException, InterruptedException, ExecutionException {
-        Plan p = ParsePlan.parseResource("physical-1.drillx");
+    public void testTrivialPlan() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException {
+        run("physical-1.drillx");
+    }
+
+    @Test
+    public void testExplodeFilter() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException {
+        run("physical-2.drillx");
+    }
+
+    private void run(String name) throws ParsePlan.ParseException, PhysicalInterpreter.SetupException, PhysicalInterpreter.QueryException {
+        Plan p = ParsePlan.parseResource(name);
         PhysicalInterpreter pi = new PhysicalInterpreter(p);
         pi.run();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data1.json
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/data1.json b/sandbox/plan-parser/src/test/resources/data1.json
index ddfaa50..faff8fd 100644
--- a/sandbox/plan-parser/src/test/resources/data1.json
+++ b/sandbox/plan-parser/src/test/resources/data1.json
@@ -2,3 +2,4 @@
 {"a":2, "b":2}
 {"a":3, "b":2}
 {"a":4, "b":2}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data2.json
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/data2.json b/sandbox/plan-parser/src/test/resources/data2.json
new file mode 100644
index 0000000..ba3f74d
--- /dev/null
+++ b/sandbox/plan-parser/src/test/resources/data2.json
@@ -0,0 +1,2 @@
+{"x":[{"a":1, "b":2}, {"a":2, "b":2}, {"a":3, "b":2}, {"a":4, "b":2}], "y":[{"a":4, "b":2}]}
+{"x":[{"a":5, "b":2}, {"a":4, "b":2}, {"a":3, "b":2}, {"a":2, "b":2}], "y":[{"a":5, "b":2}, {"a":4, "b":2}]}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/physical-2.drillx
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/physical-2.drillx b/sandbox/plan-parser/src/test/resources/physical-2.drillx
new file mode 100644
index 0000000..14924c7
--- /dev/null
+++ b/sandbox/plan-parser/src/test/resources/physical-2.drillx
@@ -0,0 +1,7 @@
+# sub-tree filtering
+%1 := scan-json "resource:data2.json"
+%2 := explode %1, "x"
+%4 := bind "a", %2
+%5 := > %4, 3
+%6 := filter %5, %2
+%7,%8 := implode %6, %2