You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by td...@apache.org on 2012/10/21 21:55:21 UTC
[3/3] git commit: DRILL-5 - First "working" version of Explode/Implode
DRILL-5 - First "working" version of Explode/Implode
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/be2ce8d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/be2ce8d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/be2ce8d2
Branch: refs/heads/master
Commit: be2ce8d2d404048304dc1c4392319380d3f00cb4
Parents: 62cebad
Author: tdunning <td...@apache.org>
Authored: Fri Oct 19 17:25:17 2012 -0700
Committer: tdunning <td...@apache.org>
Committed: Fri Oct 19 17:25:17 2012 -0700
----------------------------------------------------------------------
.../main/java/org/apache/drill/plan/ParsePlan.java | 43 +++++--
.../org/apache/drill/plan/PhysicalInterpreter.java | 85 ++++++++++++--
.../plan/physical/operators/ArithmeticOp.java | 7 +-
.../plan/physical/operators/BatchListener.java | 11 ++
.../apache/drill/plan/physical/operators/Bind.java | 6 +-
.../plan/physical/operators/EvalOperator.java | 13 ++
.../drill/plan/physical/operators/Explode.java | 61 +++++++++-
.../drill/plan/physical/operators/Implode.java | 63 +++++++++++
.../drill/plan/physical/operators/JsonSchema.java | 87 +++++++++++++++
.../drill/plan/physical/operators/Operator.java | 75 +++++++++----
.../drill/plan/physical/operators/ScanJson.java | 35 +-----
.../drill/plan/physical/operators/Schema.java | 10 ++-
.../java/org/apache/drill/plan/ParsePlanTest.java | 12 +-
.../apache/drill/plan/PhysicalInterpreterTest.java | 18 ++-
sandbox/plan-parser/src/test/resources/data1.json | 1 +
sandbox/plan-parser/src/test/resources/data2.json | 2 +
.../src/test/resources/physical-2.drillx | 7 +
17 files changed, 431 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
index 781bc62..1d7230d 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ParsePlan.java
@@ -35,28 +35,37 @@ import java.util.Formatter;
/**
* Parses a plan from a resource or file.
- *
+ * <p/>
* The result is validated to ensure that symbols mentioned on the left-hand side of assignments are only mentioned
* once and all referenced symbols on the right hand side are defined somewhere.
*/
public class ParsePlan {
- public static Plan parseResource(File file) throws IOException, RecognitionException, ValidationException {
+ public static Plan parseResource(File file) throws ParseException {
return ParsePlan.parse(Files.newReaderSupplier(file, Charsets.UTF_8));
}
- public static Plan parseResource(String resourceName) throws IOException, RecognitionException, ValidationException {
+ public static Plan parseResource(String resourceName) throws ParseException {
return ParsePlan.parse(Resources.newReaderSupplier(Resources.getResource(resourceName), Charsets.UTF_8));
}
- public static Plan parse(InputSupplier<InputStreamReader> in) throws IOException, RecognitionException, ValidationException {
- InputStreamReader inStream = in.getInput();
- PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream));
- PlanParser r = new PlanParser(new CommonTokenStream(lex));
- inStream.close();
+ public static Plan parse(InputSupplier<InputStreamReader> in) throws ParseException {
+ PlanParser r;
+ try {
+ InputStreamReader inStream = in.getInput();
+ PlanLexer lex = new PlanLexer(new ANTLRReaderStream(inStream));
+ r = new PlanParser(new CommonTokenStream(lex));
+ inStream.close();
+ } catch (IOException e) {
+ throw new ParseException(e);
+ }
- Plan plan = r.plan().r;
- validate(plan);
- return plan;
+ try {
+ Plan plan = r.plan().r;
+ validate(plan);
+ return plan;
+ } catch (RecognitionException e) {
+ throw new ParseException(e);
+ }
}
private static void validate(Plan r) throws ValidationException {
@@ -98,7 +107,17 @@ public class ParsePlan {
}
}
- public static class ValidationException extends Exception {
+ public static class ParseException extends Exception {
+ public ParseException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public ParseException(String s) {
+ super(s);
+ }
+ }
+
+ public static class ValidationException extends ParseException {
public ValidationException(String s) {
super(s);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
index 3058206..778ac90 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java
@@ -17,6 +17,9 @@
package org.apache.drill.plan;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.plan.ast.Arg;
@@ -25,15 +28,14 @@ import org.apache.drill.plan.ast.Plan;
import org.apache.drill.plan.physical.operators.DataListener;
import org.apache.drill.plan.physical.operators.Operator;
+import javax.annotation.Nullable;
+import java.io.Closeable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
/**
* Takes a physical plan and interprets in locally. The goal here is to provide a reference
@@ -42,12 +44,22 @@ import java.util.concurrent.Future;
public class PhysicalInterpreter implements DataListener {
private final List<Operator> ops;
- public PhysicalInterpreter(Plan prog) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
+ public PhysicalInterpreter(Plan prog) throws SetupException {
Map<Integer, Operator> bindings = Maps.newHashMap();
ops = Lists.newArrayList();
- for (Op op : prog.getStatements()) {
- ops.add(Operator.create(op, bindings));
+ try {
+ for (Op op : prog.getStatements()) {
+ ops.add(Operator.create(op, bindings));
+ }
+ } catch (NoSuchMethodException e) {
+ throw new SetupException(e);
+ } catch (InvocationTargetException e) {
+ throw new SetupException(e);
+ } catch (IllegalAccessException e) {
+ throw new SetupException(e);
+ } catch (InstantiationException e) {
+ throw new SetupException(e);
}
Iterator<Op> i = prog.getStatements().iterator();
@@ -61,13 +73,46 @@ public class PhysicalInterpreter implements DataListener {
}
}
- public void run() throws InterruptedException, ExecutionException {
+ public void run() throws QueryException {
ExecutorService pool = Executors.newFixedThreadPool(ops.size());
- List<Future<Object>> tasks = pool.invokeAll(ops);
+
+ // pick out the ops that are tasks
+ List<Callable<Object>> tasks = Lists.newArrayList(Iterables.transform(Iterables.filter(ops, new Predicate<Operator>() {
+ @Override
+ public boolean apply(@Nullable Operator operator) {
+ return operator instanceof Callable;
+ }
+ }), new Function<Operator, Callable<Object>>() {
+ @Override
+ public Callable<Object> apply(@Nullable Operator operator) {
+ // cast is safe due to previous filter
+ return (Callable<Object>) operator;
+ }
+ }));
+
+ List<Future<Object>> results;
+ try {
+ results = pool.invokeAll(tasks);
+ } catch (InterruptedException e) {
+ throw new QueryException(e);
+ }
pool.shutdown();
- for (Future<Object> task : tasks) {
- System.out.printf("%s\n", task.get());
+ for (Operator op : ops) {
+ if (op instanceof Closeable) {
+ op.close();
+ }
+ }
+
+ try {
+ Iterator<Callable<Object>> i = tasks.iterator();
+ for (Future<Object> result : results) {
+ System.out.printf("%s => %s\n", i.next(), result.get());
+ }
+ } catch (InterruptedException e) {
+ throw new QueryException(e);
+ } catch (ExecutionException e) {
+ throw new QueryException(e);
}
}
@@ -76,4 +121,22 @@ public class PhysicalInterpreter implements DataListener {
public void notify(Object r) {
System.out.printf("out = %s\n", r);
}
+
+ public static class InterpreterException extends Exception {
+ private InterpreterException(Throwable throwable) {
+ super(throwable);
+ }
+ }
+
+ public static class SetupException extends InterpreterException {
+ private SetupException(Throwable throwable) {
+ super(throwable);
+ }
+ }
+
+ public static class QueryException extends InterpreterException {
+ private QueryException(Throwable throwable) {
+ super(throwable);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
index 516feff..fb65c10 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java
@@ -40,10 +40,7 @@ public abstract class ArithmeticOp extends EvalOperator {
public EvalOperator left, right;
public ArithmeticOp(Op op, Map<Integer, Operator> bindings) {
- checkArity(op, 2, 1);
-
- // bind our output
- bindings.put(op.getOutputs().get(0).asSymbol().getInt(), this);
+ super(op, bindings, 2, 1);
}
@Override
@@ -64,8 +61,6 @@ public abstract class ArithmeticOp extends EvalOperator {
@Override
public void link(Op op, Map<Integer, Operator> bindings) {
- checkArity(op, 2, 1);
-
List<Arg> in = op.getInputs();
left = extractOperand(in.get(0), bindings);
right = extractOperand(in.get(1), bindings);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
new file mode 100644
index 0000000..a103ffc
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/BatchListener.java
@@ -0,0 +1,11 @@
+package org.apache.drill.plan.physical.operators;
+
+/**
+ * Aggregate and Implode operators need to know when batches of records are finished and thus
+ * they implement BatchListener. Note that the source of data is different from the source of
+ * batch boundaries. This avoids the need for every data processor to propagate boundaries but
+ * it also allows fancy structures to be constructed to do interesting things.
+ */
+public interface BatchListener {
+ public void endBatch(Object parent);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
index 329272e..4bd2b73 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java
@@ -17,10 +17,8 @@
package org.apache.drill.plan.physical.operators;
-import org.apache.drill.plan.ast.Arg;
import org.apache.drill.plan.ast.Op;
-import java.util.List;
import java.util.Map;
/**
@@ -36,9 +34,7 @@ public class Bind extends EvalOperator {
private String name;
public Bind(Op op, Map<Integer, Operator> bindings) {
- checkArity(op, 2, 1);
- List<Arg> out = op.getOutputs();
- bindings.put(out.get(0).asSymbol().getInt(), this);
+ super(op, bindings, 2, 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
index dfaea7b..d02cafe 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java
@@ -17,10 +17,23 @@
package org.apache.drill.plan.physical.operators;
+import org.apache.drill.plan.ast.Op;
+
+import java.util.Map;
+
/**
* Describes a scalar expression.
*/
public abstract class EvalOperator extends Operator {
+ public EvalOperator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) {
+ super(op, bindings, inputArgs, outputArgs);
+ }
+
+ // only for Constants
+ protected EvalOperator() {
+ super();
+ }
+
public abstract Object eval(Object data);
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
index a083353..74bec49 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Explode.java
@@ -1,11 +1,60 @@
package org.apache.drill.plan.physical.operators;
+import org.apache.drill.plan.ast.Arg;
+import org.apache.drill.plan.ast.Op;
+
+import java.util.List;
+import java.util.Map;
+
/**
- * Created with IntelliJ IDEA.
- * User: tdunning
- * Date: 10/17/12
- * Time: 6:52 PM
- * To change this template use File | Settings | File Templates.
+ * Explode creates a secondary data flow that contains a batch of records for
+ * each input record. Typically, Explode is paired with Implode which gathers
+ * the results back together or with Aggregate which gathers results in a different
+ * way. Cogroup, Explode, Aggregate gives a group by aggregate and
+ * Explode, Filter, Implode gives sub-tree filtering.
+ *
+ * Explode has two outputs. One is a data source and one is a batch controller (source of batch
+ * boundaries). Any downstream operator that delays emitting records should
+ * listen to the batch boundaries to avoid having aggregate outputs that cross batch
+ * boundaries.
*/
-public class Explode {
+public class Explode extends Operator implements DataListener {
+ private String variableToExplode;
+ private Schema schema;
+ private Schema subSchema;
+
+ public static void define() {
+ Operator.defineOperator("explode", Explode.class);
+ }
+
+ public Explode(Op op, Map<Integer, Operator> bindings) {
+ // exploded-data-stream, batch-controller := explode data-in, variable-name-to-explode
+ super(op, bindings, 2, 1);
+ }
+
+ @Override
+ public void link(Op op, Map<Integer, Operator> bindings) {
+ List<Arg> in = op.getInputs();
+ Operator data = bindings.get(in.get(0).asSymbol().getInt());
+ data.addDataListener(this);
+ schema = data.getSchema();
+
+ variableToExplode = in.get(1).asString();
+ subSchema = schema.getSubSchema(variableToExplode);
+ }
+
+ @Override
+ public void notify(Object row) {
+ // for each input we get, we iterate through our exploding variable
+ for (Object value : schema.getIterable(variableToExplode, row)) {
+ emit(value);
+ }
+ finishBatch(row);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return subSchema;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
new file mode 100644
index 0000000..15f0f5b
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Implode.java
@@ -0,0 +1,63 @@
+package org.apache.drill.plan.physical.operators;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.plan.ast.Arg;
+import org.apache.drill.plan.ast.Op;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Aggregate records into a list that is inserted into the parent record for the batch.
+ */
+public class Implode extends Operator implements DataListener, BatchListener {
+ private Operator data;
+ private List<Object> accumulator = Lists.newArrayList();
+ private Schema schema ;
+ private String accumulatorVar;
+
+ public static void define() {
+ Operator.defineOperator("implode", Implode.class);
+ }
+
+ public Implode(Op op, Map<Integer, Operator> bindings) {
+ // data-out, implode-var := implode data-in, batch-master
+ super(op, bindings, 2, 2);
+ }
+
+ @Override
+ public void link(Op op, Map<Integer, Operator> bindings) {
+ List<Arg> in = op.getInputs();
+ data = bindings.get(in.get(0).asSymbol().getInt());
+ data.addDataListener(this);
+
+// schema = data.getSchema().overlay();
+ schema = new JsonSchema();
+ Operator batchController = bindings.get(in.get(1).asSymbol().getInt());
+ batchController.addBatchListener(this);
+
+ accumulatorVar = Operator.gensym();
+ }
+
+ @Override
+ public Object eval() {
+ return accumulatorVar;
+ }
+
+ @Override
+ public Schema getSchema() {
+ throw new UnsupportedOperationException("Default operation");
+ }
+
+ @Override
+ public void notify(Object r) {
+ accumulator.add(r);
+ }
+
+ @Override
+ public void endBatch(Object parent) {
+ schema.set(accumulatorVar, parent, accumulator);
+ accumulator = Lists.newArrayList();
+ emit(parent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
new file mode 100644
index 0000000..19b750a
--- /dev/null
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/JsonSchema.java
@@ -0,0 +1,87 @@
+package org.apache.drill.plan.physical.operators;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.gson.*;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+* Created with IntelliJ IDEA.
+* User: tdunning
+* Date: 10/19/12
+* Time: 5:01 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonSchema extends Schema {
+ Splitter onDot = Splitter.on(".");
+
+ @Override
+ public Object get(String name, Object data) {
+ JsonElement r = (JsonElement) data;
+ Iterable<String> bits = onDot.split(name);
+ for (String bit : bits) {
+ r = ((JsonObject) data).get(bit);
+ }
+ return cleanupJsonisms(r);
+ }
+
+ @Override
+ public <T> Iterable<T> getIterable(String name, Object data) {
+ Object r = get(name, data);
+ if (r instanceof JsonArray) {
+ return Iterables.transform((JsonArray) r, new Function<JsonElement, T>() {
+ @Override
+ public T apply(@Nullable JsonElement jsonElement) {
+ return (T) cleanupJsonisms(jsonElement);
+ }
+ });
+ } else {
+ throw new IllegalArgumentException("Looked for array but value was " + r.getClass());
+ }
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return new JsonSchema();
+ }
+
+ @Override
+ public Schema overlay() {
+ return this;
+ }
+
+ @Override
+ public void set(String name, Object parent, Object value) {
+ if (value instanceof Collection) {
+ // input is likely to have been collected by Implode
+ // TODO but what if something else built this? Do we need a general serialization framework?
+ JsonArray r = new JsonArray();
+ for (Object v : ((Collection) value)) {
+ r.add((JsonElement) v);
+ }
+ ((JsonObject) parent).add(name, r);
+ } else if (value instanceof Number) {
+ ((JsonObject) parent).add(name, new JsonPrimitive((Number) value));
+ } else {
+ throw new IllegalArgumentException(String.format("Can't convert a %s to JSON by magic", value.getClass()));
+ }
+ }
+
+ private Object cleanupJsonisms(JsonElement data) {
+ if (data instanceof JsonPrimitive) {
+ JsonPrimitive v = (JsonPrimitive) data;
+ if (v.isNumber()) {
+ return v.getAsDouble();
+ } else if (v.isString()) {
+ return v.getAsString();
+ } else {
+ return v.getAsBoolean();
+ }
+ } else {
+ return data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
index d34ff9c..9c6c0cb 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java
@@ -26,34 +26,48 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Implements a function for an operator on a single line of the physical plan.
- *
+ * <p/>
* The life cycle of an operator is
* <nl>
- * <li>The operator's constructor is defined using Operator.defineOperator</li>
- * <li>The operator is constructed via Operator.create. It is expected that
- * the operator will fill in references to it's own outputs into the DAG bindings</li>
- * <li>The operator is linked by a call to its link() method. At this point, the
- * operator can look at its arguments and resolve references to its inputs.
- * This is when it should add itself as a data listener and when it should request
- * any schema that it needs from upstream Operator's.</li>
- * <li>The operator's run() method is called. Most operators should simply return at this
- * point, but data sources should start calling emit with data records.</li>
- * <li>The operator will be notified of incoming data. It should process this data
- * and emit the result.</li>
+ * <li>The operator's constructor is defined using Operator.defineOperator</li>
+ * <li>The operator is constructed via Operator.create. It is expected that
+ * the operator will fill in references to it's own outputs into the DAG bindings</li>
+ * <li>The operator is linked by a call to its link() method. At this point, the
+ * operator can look at its arguments and resolve references to its inputs.
+ * This is when it should add itself as a data listener and when it should request
+ * any schema that it needs from upstream Operator's.</li>
+ * <li>The operator's run() method is called. Most operators should simply return at this
+ * point, but data sources should start calling emit with data records.</li>
+ * <li>The operator will be notified of incoming data. It should process this data
+ * and emit the result.</li>
* </nl>
*/
-public abstract class Operator implements Callable<Object> {
+public abstract class Operator {
+ private static AtomicInteger genCount = new AtomicInteger(0);
private static final Map<String, Class<? extends Operator>> operatorMap = Maps.newHashMap();
+ public Operator(Op op, Map<Integer, Operator> bindings, int inputArgs, int outputArgs) {
+ checkArity(op, inputArgs, outputArgs);
+ for (Arg arg : op.getOutputs()) {
+ bindings.put(arg.asSymbol().getInt(), this);
+ }
+ }
+
+ // only for testing and constants
+ protected Operator() {
+ }
+
static {
ArithmeticOp.define();
Bind.define();
Filter.define();
ScanJson.define();
+ Explode.define();
+ Implode.define();
}
@@ -67,34 +81,53 @@ public abstract class Operator implements Callable<Object> {
public static Operator create(Op op, Map<Integer, Operator> bindings) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException {
Class<? extends Operator> c = operatorMap.get(op.getOp());
if (c == null) {
- throw new IllegalArgumentException(String.format("No such operators as %s", op.getOp()));
+ throw new IllegalArgumentException(String.format("No such operator as %s", op.getOp()));
}
Constructor<? extends Operator> con = c.getConstructor(Op.class, Map.class);
return con.newInstance(op, bindings);
}
- protected final List<DataListener> dataOut = Lists.newArrayList();
+
+ public static String gensym() {
+ return String.format("__sym-%d", genCount.incrementAndGet());
+ }
+
+ private final List<DataListener> dataOut = Lists.newArrayList();
+ private List<BatchListener> batchOut = Lists.newArrayList();
public void addDataListener(DataListener listener) {
this.dataOut.add(listener);
}
+ public void addBatchListener(BatchListener listener) {
+ this.batchOut.add(listener);
+ }
+
protected void emit(Object r) {
for (DataListener listener : dataOut) {
listener.notify(r);
}
}
- public double eval() {
+ protected void finishBatch(Object parent) {
+ for (BatchListener listener : batchOut) {
+ listener.endBatch(parent);
+ }
+ }
+
+ public double evalAsDouble() {
throw new UnsupportedOperationException("default no can do"); //To change body of created methods use File | Settings | File Templates.
}
+ public Object eval() {
+ return null;
+ }
+
public abstract void link(Op op, Map<Integer, Operator> bindings);
- public Object call() throws Exception {
- // do nothing
- return null;
+ public void close() {
+ // do nothing by default... over-ride for clever behavior
}
public abstract Schema getSchema();
@@ -107,7 +140,7 @@ public abstract class Operator implements Callable<Object> {
List<Arg> out = op.getOutputs();
if (out.size() != outputArgs) {
- throw new IllegalArgumentException("bind should have exactly one output");
+ throw new IllegalArgumentException(String.format("Operator should have exactly %d outputs, not %d", outputArgs, out.size()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
index 2e7155d..0b699c3 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java
@@ -18,14 +18,10 @@
package org.apache.drill.plan.physical.operators;
import com.google.common.base.Charsets;
-import com.google.common.base.Splitter;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.io.Resources;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.JsonStreamParser;
+import com.google.gson.*;
import org.apache.drill.plan.ast.Arg;
import org.apache.drill.plan.ast.Op;
@@ -35,11 +31,12 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
/**
* Reads JSON formatted records from a file.
*/
-public class ScanJson extends Operator {
+public class ScanJson extends Operator implements Callable<Object> {
public static void define() {
Operator.defineOperator("scan-json", ScanJson.class);
@@ -90,6 +87,9 @@ public class ScanJson extends Operator {
count++;
}
in.close();
+ // TODO what should the parent record be at the top-level?
+ finishBatch(null);
+
return count;
}
@@ -98,27 +98,4 @@ public class ScanJson extends Operator {
return new JsonSchema();
}
- private class JsonSchema extends Schema {
- Splitter onDot = Splitter.on(".");
-
- @Override
- public Object get(String name, Object data) {
- Iterable<String> bits = onDot.split(name);
- for (String bit : bits) {
- data = ((JsonObject) data).get(bit);
- }
- if (data instanceof JsonPrimitive) {
- JsonPrimitive v = (JsonPrimitive) data;
- if (v.isNumber()) {
- return v.getAsDouble();
- } else if (v.isString()) {
- return v.getAsString();
- } else {
- return v.getAsBoolean();
- }
- } else {
- return data;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
index e38a792..424c715 100644
--- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
+++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java
@@ -19,9 +19,17 @@ package org.apache.drill.plan.physical.operators;
/**
* Describes a schema. In this context a schema is what understands how to get data
- * out of a record. For JSON, the schema is pretty dumb, but for other data types it
+ * into and out of a record. For JSON, the schema is pretty dumb, but for other data types it
* could be quite clever.
*/
public abstract class Schema {
public abstract Object get(String name, Object data);
+
+ public abstract <T> Iterable<? extends T> getIterable(String name, Object data);
+
+ public abstract Schema getSubSchema(String name);
+
+ public abstract Schema overlay();
+
+ public abstract void set(String name, Object parent, Object value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
index 7bbbaf6..7c4d6c4 100644
--- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
+++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import org.antlr.runtime.ANTLRReaderStream;
import org.antlr.runtime.MissingTokenException;
-import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.Token;
import org.apache.drill.plan.ast.LogicalPlanParseException;
import org.apache.drill.plan.ast.Plan;
@@ -38,25 +37,25 @@ import static junit.framework.Assert.*;
public class ParsePlanTest {
@Test
- public void testParse1() throws IOException, RecognitionException, ParsePlan.ValidationException {
+ public void testParse1() throws ParsePlan.ParseException {
Plan r = ParsePlan.parseResource("plan1.drillx");
assertEquals("Lines", 3, r.getStatements().size());
}
@Test
- public void testParse2() throws IOException, RecognitionException, ParsePlan.ValidationException {
+ public void testParse2() throws ParsePlan.ParseException {
Plan r = ParsePlan.parseResource("plan2.drillx");
assertEquals("Lines", 6, r.getStatements().size());
}
@Test
- public void testParse3() throws IOException, RecognitionException, ParsePlan.ValidationException {
+ public void testParse3() throws ParsePlan.ParseException {
Plan r = ParsePlan.parseResource("plan3.drillx");
assertEquals("Lines", 8, r.getStatements().size());
}
@Test
- public void testParseError1() throws IOException, RecognitionException, ParsePlan.ValidationException {
+ public void testParseError1() throws ParsePlan.ParseException {
try {
ParsePlan.parseResource("bad-plan1.drillx");
fail("Should have thrown exception");
@@ -67,7 +66,7 @@ public class ParsePlanTest {
}
@Test
- public void testParseError2() throws IOException, RecognitionException, ParsePlan.ValidationException {
+ public void testParseError2() throws ParsePlan.ParseException {
try {
ParsePlan.parseResource("bad-plan2.drillx");
fail("Should have thrown exception");
@@ -80,7 +79,6 @@ public class ParsePlanTest {
}
-
@Test
public void testLexer() throws IOException {
List<String> ref = Lists.newArrayList(
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
index 6d401c1..77a9e57 100644
--- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
+++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/PhysicalInterpreterTest.java
@@ -17,18 +17,22 @@
package org.apache.drill.plan;
-import org.antlr.runtime.RecognitionException;
import org.apache.drill.plan.ast.Plan;
import org.junit.Test;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.concurrent.ExecutionException;
-
public class PhysicalInterpreterTest {
@Test
- public void testTrivialPlan() throws ParsePlan.ValidationException, RecognitionException, IOException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException, InterruptedException, ExecutionException {
- Plan p = ParsePlan.parseResource("physical-1.drillx");
+ public void testTrivialPlan() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException {
+ run("physical-1.drillx");
+ }
+
+ @Test
+ public void testExplodeFilter() throws PhysicalInterpreter.InterpreterException, ParsePlan.ParseException {
+ run("physical-2.drillx");
+ }
+
+ private void run(String name) throws ParsePlan.ParseException, PhysicalInterpreter.SetupException, PhysicalInterpreter.QueryException {
+ Plan p = ParsePlan.parseResource(name);
PhysicalInterpreter pi = new PhysicalInterpreter(p);
pi.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data1.json
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/data1.json b/sandbox/plan-parser/src/test/resources/data1.json
index ddfaa50..faff8fd 100644
--- a/sandbox/plan-parser/src/test/resources/data1.json
+++ b/sandbox/plan-parser/src/test/resources/data1.json
@@ -2,3 +2,4 @@
{"a":2, "b":2}
{"a":3, "b":2}
{"a":4, "b":2}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/data2.json
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/data2.json b/sandbox/plan-parser/src/test/resources/data2.json
new file mode 100644
index 0000000..ba3f74d
--- /dev/null
+++ b/sandbox/plan-parser/src/test/resources/data2.json
@@ -0,0 +1,2 @@
+{"x":[{"a":1, "b":2}, {"a":2, "b":2}, {"a":3, "b":2}, {"a":4, "b":2}], "y":[{"a":4, "b":2}]}
+{"x":[{"a":5, "b":2}, {"a":4, "b":2}, {"a":3, "b":2}, {"a":2, "b":2}], "y":[{"a":5, "b":2}, {"a":4, "b":2}]}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/be2ce8d2/sandbox/plan-parser/src/test/resources/physical-2.drillx
----------------------------------------------------------------------
diff --git a/sandbox/plan-parser/src/test/resources/physical-2.drillx b/sandbox/plan-parser/src/test/resources/physical-2.drillx
new file mode 100644
index 0000000..14924c7
--- /dev/null
+++ b/sandbox/plan-parser/src/test/resources/physical-2.drillx
@@ -0,0 +1,7 @@
+# sub-tree filtering
+%1 := scan-json "resource:data2.json"
+%2 := explode %1, "x"
+%4 := bind "a", %2
+%5 := > %4, 3
+%6 := filter %5, %2
+%7,%8 := implode %6, %2