You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by td...@apache.org on 2012/12/02 00:47:45 UTC

[5/7] git commit: Update operator graph to be graph shaped. Automatically get source and sink operators and confirm their location

Update operator graph to be graph shaped.  Automatically get source and sink operators and confirm their location


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/eaa84d65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/eaa84d65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/eaa84d65

Branch: refs/heads/DRILL-3
Commit: eaa84d65369491c02d68cf7fff4aba3cf76d3294
Parents: 78940e7
Author: Jacques Nadeau <ja...@gmail.com>
Authored: Mon Nov 26 18:27:41 2012 -0800
Committer: Jacques Nadeau <ja...@gmail.com>
Committed: Mon Nov 26 18:27:41 2012 -0800

----------------------------------------------------------------------
 .../drill/common/expression/BooleanFunctions.java  |   15 +--
 .../common/expression/ExpressionFunction.java      |  117 ++++++++-------
 .../drill/common/expression/FieldReference.java    |    8 +-
 .../drill/common/expression/LogicalExpression.java |  104 ++++++-------
 .../common/expression/visitors/OpVisitor.java      |    7 +
 .../apache/drill/common/logical/LogicalPlan.java   |   55 ++++++-
 .../apache/drill/common/logical/OperatorGraph.java |  106 +++++++++++++
 .../common/logical/UnexpectedOperatorType.java     |   19 +++
 .../drill/common/logical/ValidationError.java      |   22 +++-
 .../apache/drill/common/logical/data/Combine.java  |   32 ++++-
 .../apache/drill/common/logical/data/Explode.java  |   20 ++-
 .../apache/drill/common/logical/data/Filter.java   |    3 +-
 .../apache/drill/common/logical/data/Flatten.java  |    8 +-
 .../apache/drill/common/logical/data/Group.java    |   10 +-
 .../org/apache/drill/common/logical/data/Join.java |   36 ++++-
 .../drill/common/logical/data/LogicalOperator.java |    7 +-
 .../common/logical/data/LogicalOperatorBase.java   |   29 ++++-
 .../org/apache/drill/common/logical/data/Nest.java |   14 +-
 .../logical/data/OperatorListDeserializer.java     |    5 +-
 .../apache/drill/common/logical/data/Order.java    |    6 +-
 .../apache/drill/common/logical/data/Project.java  |   14 ++-
 .../org/apache/drill/common/logical/data/Scan.java |   17 ++-
 .../apache/drill/common/logical/data/Sequence.java |  102 ++++++++----
 .../common/logical/data/SingleInputOperator.java   |   23 +++-
 .../drill/common/logical/data/SinkOperator.java    |    9 +
 .../drill/common/logical/data/SourceOperator.java  |    9 +
 .../drill/common/logical/data/Transform.java       |    4 +-
 .../apache/drill/common/logical/data/Union.java    |   26 +++-
 .../common/logical/data/ZeroInputOperator.java     |    5 -
 .../drill/common/logical/graph/AdjacencyList.java  |   98 ++++++++++++
 .../apache/drill/common/logical/graph/Edge.java    |   25 +++
 .../drill/common/logical/graph/GraphAlgos.java     |  122 +++++++++++++++
 .../apache/drill/common/logical/graph/Node.java    |   35 ++++
 .../drill/common/logical/sources/DataSource.java   |    2 +
 .../common/src/test/resources/simple_plan.json     |   57 +++++--
 35 files changed, 945 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/BooleanFunctions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/BooleanFunctions.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/BooleanFunctions.java
index 4a33148..847c714 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/BooleanFunctions.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/BooleanFunctions.java
@@ -12,12 +12,10 @@ public class BooleanFunctions {
   public static final Class<?>[] SUB_TYPES = { Or.class, And.class };
 
   private static abstract class BooleanFunctionBase extends FunctionBase {
-    
+
     protected BooleanFunctionBase(List<LogicalExpression> expressions) {
       super(expressions);
     }
-    
-
 
     @Override
     public DataType getDataType() {
@@ -35,15 +33,13 @@ public class BooleanFunctions {
       this.opToString(sb, "||");
     }
 
-    
-    
   }
 
   public static class And extends BooleanFunctionBase {
     public And(List<LogicalExpression> expressions) {
       super(expressions);
     }
-    
+
     @Override
     public void addToString(StringBuilder sb) {
       this.opToString(sb, "&&");
@@ -56,7 +52,7 @@ public class BooleanFunctions {
     public IsNull(List<LogicalExpression> expressions) {
       super(expressions);
     }
-    
+
     @Override
     public void addToString(StringBuilder sb) {
       this.funcToString(sb, "isNull");
@@ -79,7 +75,6 @@ public class BooleanFunctions {
       }
     }
 
-    
     public Comparison(String s, LogicalExpression left, LogicalExpression right) {
       this.left = left;
       this.right = right;
@@ -95,7 +90,7 @@ public class BooleanFunctions {
       type = temp;
 
     }
-    
+
     @Override
     public void addToString(StringBuilder sb) {
       sb.append(" ( ");
@@ -108,7 +103,7 @@ public class BooleanFunctions {
     }
 
     public static LogicalExpression create(List<LogicalExpression> expressions, List<String> comparisonTypes) {
-      logger.debug("Generating new comparison expressions.");
+      // logger.debug("Generating new comparison expressions.");
       if (expressions.size() == 1) {
         return expressions.get(0);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/ExpressionFunction.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/ExpressionFunction.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/ExpressionFunction.java
index dee571a..70d855a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/ExpressionFunction.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/ExpressionFunction.java
@@ -11,60 +11,69 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.reflect.TypeToken;
 
-
-@SuppressWarnings("unchecked") 
+@SuppressWarnings("unchecked")
 public class ExpressionFunction {
-	final static Logger logger = LoggerFactory.getLogger(ExpressionFunction.class);
-	
-	@SuppressWarnings("serial")
-	final static Type EXPR_LIST = (new TypeToken<List<LogicalExpression>>() {}).getType();
-	
-	private static final Class<?>[] FUNCTIONS = {
-			BooleanFunctions.And.class, BooleanFunctions.Or.class, StringFunctions.Regex.class, StringFunctions.StartsWith.class, IsNull.class
-	};
-	
-	private static final ImmutableMap<String, Constructor<LogicalExpression>> FUNCTION_MAP;
-	
-	static {
-		ImmutableMap.Builder<String, Constructor<LogicalExpression>> builder =  ImmutableMap.builder();
-		for(Class<?> c : FUNCTIONS){
-			
-			logger.debug("Adding {} and function", c);
-			if(!LogicalExpression.class.isAssignableFrom(c)){
-				logger.error("The provided Class [{}] does not derive from LogicalExpression.  Skipping inclusion in registry.", c);
-				continue;
-			}
-			FunctionName fn = c.getAnnotation(FunctionName.class);
-			if(fn == null){
-				logger.error("The provided Class [{}] did not have a FunctionName annotation.  Skipping inclusion in registry.", c);
-				continue;
-			}
-			String name = fn.value();
-			try{
-				Constructor<LogicalExpression> m = (Constructor<LogicalExpression>) c.getConstructor(List.class);
-				if(!EXPR_LIST.equals(m.getGenericParameterTypes()[0])){
-					logger.error("The constructor for each function must have a argument list that only contains a List<LogicalExpression>.  The class[{}] has an inccorect List<{}> argument.", c, m.getGenericParameterTypes()[0]);
-					continue;
-				}
-						
-				builder.put(name, m);
-			}catch(Exception e){
-				logger.error("Failure while attempting to retrieve Logical Expression list constructor on class [{}].  Functions must have one of these.", c, e);
-			}
-			
-		}
-		
-		FUNCTION_MAP = builder.build();
-	}
-	
+  final static Logger logger = LoggerFactory.getLogger(ExpressionFunction.class);
+
+  @SuppressWarnings("serial")
+  final static Type EXPR_LIST = (new TypeToken<List<LogicalExpression>>() {
+  }).getType();
+
+  private static final Class<?>[] FUNCTIONS = { BooleanFunctions.And.class, BooleanFunctions.Or.class,
+      StringFunctions.Regex.class, StringFunctions.StartsWith.class, IsNull.class };
+
+  private static final ImmutableMap<String, Constructor<LogicalExpression>> FUNCTION_MAP;
+
+  static {
+    ImmutableMap.Builder<String, Constructor<LogicalExpression>> builder = ImmutableMap.builder();
+    for (Class<?> c : FUNCTIONS) {
+
+      // logger.debug("Adding {} and function", c);
+      if (!LogicalExpression.class.isAssignableFrom(c)) {
+        logger.error(
+            "The provided Class [{}] does not derive from LogicalExpression.  Skipping inclusion in registry.", c);
+        continue;
+      }
+      FunctionName fn = c.getAnnotation(FunctionName.class);
+      if (fn == null) {
+        logger.error(
+            "The provided Class [{}] did not have a FunctionName annotation.  Skipping inclusion in registry.", c);
+        continue;
+      }
+      String name = fn.value();
+      try {
+        Constructor<LogicalExpression> m = (Constructor<LogicalExpression>) c.getConstructor(List.class);
+        if (!EXPR_LIST.equals(m.getGenericParameterTypes()[0])) {
+          logger
+              .error(
+                  "The constructor for each function must have a argument list that only contains a List<LogicalExpression>.  The class[{}] has an inccorect List<{}> argument.",
+                  c, m.getGenericParameterTypes()[0]);
+          continue;
+        }
+
+        builder.put(name, m);
+      } catch (Exception e) {
+        logger
+            .error(
+                "Failure while attempting to retrieve Logical Expression list constructor on class [{}].  Functions must have one of these.",
+                c, e);
+      }
+
+    }
+
+    FUNCTION_MAP = builder.build();
+  }
 
-	public static LogicalExpression create(String functionName, List<LogicalExpression> expressions) throws ExpressionValidationError{
-		logger.debug("Requesting generation of new function with name {}.", functionName);
-		if(!FUNCTION_MAP.containsKey(functionName)) throw new ExpressionValidationError(String.format("Unknown function with name '%s'", functionName));
-		try {
-			return FUNCTION_MAP.get(functionName).newInstance(expressions);
-		} catch (Exception e) {
-			throw new ExpressionValidationError("Failure while attempting to build type of " + functionName, e);
-		}
-	}
+  public static LogicalExpression create(String functionName, List<LogicalExpression> expressions)
+      throws ExpressionValidationError {
+    // logger.debug("Requesting generation of new function with name {}.",
+    // functionName);
+    if (!FUNCTION_MAP.containsKey(functionName))
+      throw new ExpressionValidationError(String.format("Unknown function with name '%s'", functionName));
+    try {
+      return FUNCTION_MAP.get(functionName).newInstance(expressions);
+    } catch (Exception e) {
+      throw new ExpressionValidationError("Failure while attempting to build type of " + functionName, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
index 5cf6d00..ec92110 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
@@ -19,7 +19,8 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 @JsonSerialize(using=Se.class)
 @JsonDeserialize(using=De.class)
 public class FieldReference extends ValueExpressions.Identifier {
-  private String refName;
+  
+
 
   public FieldReference(String value) {
     super(value);
@@ -34,8 +35,7 @@ public class FieldReference extends ValueExpressions.Identifier {
     @Override
     public FieldReference deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
         JsonProcessingException {
-      String s = jp.getText();
-      return new FieldReference(s);
+      return new FieldReference(this._parseString(jp, ctxt));
     }
 
   }
@@ -49,7 +49,7 @@ public class FieldReference extends ValueExpressions.Identifier {
     @Override
     public void serialize(FieldReference value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
         JsonGenerationException {
-      jgen.writeString(value.refName);
+      jgen.writeString(value.value);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/LogicalExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/LogicalExpression.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/LogicalExpression.java
index bfdd6cc..3b0686b 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/LogicalExpression.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/LogicalExpression.java
@@ -27,58 +27,56 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
-@JsonDeserialize(using=LogicalExpression.De.class)
-@JsonSerialize(using=LogicalExpression.Se.class)
-//@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include = JsonTypeInfo.As.PROPERTY, property = "fn")
+@JsonDeserialize(using = LogicalExpression.De.class)
+@JsonSerialize(using = LogicalExpression.Se.class)
+// @JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include =
+// JsonTypeInfo.As.PROPERTY, property = "fn")
 public interface LogicalExpression {
-	static final Logger logger = LoggerFactory.getLogger(LogicalExpression.class);
-	
-	public static final Class<?>[] SUB_TYPES = {};
-
-	LogicalExpression wrapWithCastIfNecessary(DataType dt)
-			throws ExpressionValidationError;
-
-	@JsonIgnore
-	public abstract DataType getDataType();
-
-	
-	public void addToString(StringBuilder sb);
-	public void resolveAndValidate(List<LogicalExpression> expressions,
-			Collection<ValidationError> errors);
-
-	public Object accept(FunctionVisitor visitor);
-
-	public static class De extends StdDeserializer<LogicalExpression> {
-		
-
-		public De() {
-			super(LogicalExpression.class);
-		}
-		
-		@Override
-		public LogicalExpression deserialize(JsonParser jp,
-				DeserializationContext ctxt) throws IOException,
-				JsonProcessingException {
-			String expr = jp.getText();
-
-			if(expr == null || expr.isEmpty()) return null;
-			try {
-				logger.debug("Parsing expression string '{}'", expr);
-				ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
-
-				CommonTokenStream tokens = new CommonTokenStream(lexer);
-				ExprParser parser = new ExprParser(tokens);
-				parse_return ret = parser.parse();
-				logger.debug("Found expression '{}'", ret.e);
-				return ret.e;
-			} catch (RecognitionException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-	}
-	
-	public static class Se extends StdSerializer<LogicalExpression> {
+  static final Logger logger = LoggerFactory.getLogger(LogicalExpression.class);
+
+  public static final Class<?>[] SUB_TYPES = {};
+
+  LogicalExpression wrapWithCastIfNecessary(DataType dt) throws ExpressionValidationError;
+
+  @JsonIgnore
+  public abstract DataType getDataType();
+
+  public void addToString(StringBuilder sb);
+
+  public void resolveAndValidate(List<LogicalExpression> expressions, Collection<ValidationError> errors);
+
+  public Object accept(FunctionVisitor visitor);
+
+  public static class De extends StdDeserializer<LogicalExpression> {
+
+    public De() {
+      super(LogicalExpression.class);
+    }
+
+    @Override
+    public LogicalExpression deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      String expr = jp.getText();
+
+      if (expr == null || expr.isEmpty())
+        return null;
+      try {
+        // logger.debug("Parsing expression string '{}'", expr);
+        ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+
+        CommonTokenStream tokens = new CommonTokenStream(lexer);
+        ExprParser parser = new ExprParser(tokens);
+        parse_return ret = parser.parse();
+        // logger.debug("Found expression '{}'", ret.e);
+        return ret.e;
+      } catch (RecognitionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  public static class Se extends StdSerializer<LogicalExpression> {
 
     protected Se() {
       super(LogicalExpression.class);
@@ -91,7 +89,7 @@ public interface LogicalExpression {
       value.addToString(sb);
       jgen.writeString(sb.toString());
     }
-	  
-	}
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
new file mode 100644
index 0000000..203b041
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
@@ -0,0 +1,7 @@
+package org.apache.drill.common.expression.visitors;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+
+public interface OpVisitor {
+  public void visit(LogicalOperator o);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 28624cf..cfdc19c 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -2,26 +2,33 @@ package org.apache.drill.common.logical;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.drill.common.logical.OperatorGraph.OpNode;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.Transform;
+import org.apache.drill.common.logical.graph.GraphAlgos;
 import org.apache.drill.common.logical.sources.DataSource;
 import org.apache.drill.common.logical.sources.record.RecordMaker;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 @JsonPropertyOrder({"head", "sources", "query"})
 public class LogicalPlan {
-	@JsonProperty("head") public PlanProperties properties = new PlanProperties();
-	@JsonProperty("sources") public List<DataSource> dataSources = new ArrayList<DataSource>();
-	@JsonProperty("query") public List<LogicalOperator> operators = new ArrayList<LogicalOperator>();
+	private final PlanProperties properties;
+	private final Map<String, DataSource> dataSources;
+	private final OperatorGraph graph;
 
 	public static void main(String[] args) throws Exception {
 		
@@ -38,10 +45,46 @@ public class LogicalPlan {
 
     String externalPlan = Files.toString(new File("src/test/resources/simple_plan.json"), Charsets.UTF_8);
     LogicalPlan plan = mapper.readValue(externalPlan, LogicalPlan.class);
-    System.out.println(mapper.writeValueAsString(((Transform) plan.operators.get(1)).input));
-		System.out.println(mapper.writeValueAsString(plan));	
+    System.out.println(mapper.writeValueAsString(plan));	
 	}
+
 	
+	@JsonCreator
+	public LogicalPlan(@JsonProperty("head") PlanProperties head, @JsonProperty("sources") List<DataSource> sources, @JsonProperty("query") List<LogicalOperator> operators){
+	  this.properties = head;
+	  this.dataSources = new HashMap<String, DataSource>(sources.size());
+    for(DataSource ds: sources){
+      DataSource old = dataSources.put(ds.getName(), ds);
+      if(old != null) throw new IllegalArgumentException("Each data source must have a unique name.  You provided more than one data source with the same name of '" + ds.getName() + "'");
+    }
+    
+    this.graph = new OperatorGraph(operators);
+	}
 	
+	@JsonProperty("query")
+	public List<LogicalOperator> getOperators(){
+	  List<OpNode> nodes = GraphAlgos.TopoSorter.sort(graph.getAdjList());
+	  Iterable<LogicalOperator> i = Iterables.transform(nodes, new Function<OpNode, LogicalOperator>(){
+	    public LogicalOperator apply(OpNode o){
+	      return o.getNodeValue();
+	    }
+	  });
+	  return Lists.newArrayList(i);
+	}
+
+
+	@JsonProperty("head")
+  public PlanProperties getProperties() {
+    return properties;
+  }
 
+
+	@JsonProperty("sources") 
+  public List<DataSource> getDataSources() {
+    return new ArrayList<DataSource>(dataSources.values());
+  }
+	
+	
+	
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
new file mode 100644
index 0000000..6ff8184
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
@@ -0,0 +1,106 @@
+package org.apache.drill.common.logical;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.visitors.OpVisitor;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.SourceOperator;
+import org.apache.drill.common.logical.graph.AdjacencyList;
+import org.apache.drill.common.logical.graph.GraphAlgos;
+import org.apache.drill.common.logical.graph.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OperatorGraph {
+
+  static final Logger logger = LoggerFactory.getLogger(OperatorGraph.class);
+  
+  private AdjacencyList<OpNode> adjList;
+  private final Collection<SourceOperator> sources;
+  private final Collection<SinkOperator> sinks;
+  
+  public OperatorGraph(List<LogicalOperator> operators){
+    AdjacencyListBuilder b = new AdjacencyListBuilder();
+    for(LogicalOperator o : operators){
+      o.accept(b);
+    }
+    
+    adjList = b.getAdjacencyList();
+    
+    List<List<OpNode>> cyclicReferences = GraphAlgos.checkDirected(adjList);
+    if(cyclicReferences.size() > 0){
+      throw new IllegalArgumentException("A logical plan must be a valid DAG.  You have cyclic references in your graph.  " + cyclicReferences);
+    }
+    sources = convert(adjList.getStartNodes(), SourceOperator.class, "Error determing list of source operators.");
+//    logger.debug("Source list {}", sources);
+    sinks = convert(adjList.getTerminalNodes(), SinkOperator.class, "Error determing list of source operators.");
+//    logger.debug("Sink list {}", sinks);
+    
+  }
+  
+  public AdjacencyList<OpNode> getAdjList() {
+    return adjList;
+  }
+
+  public Collection<SourceOperator> getSources() {
+    return sources;
+  }
+
+  public Collection<SinkOperator> getSinks() {
+    return sinks;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends LogicalOperator> Collection<T> convert(Collection<OpNode> nodes, Class<T> classIdentifier, String error){
+    List<T> out = new ArrayList<T>(nodes.size());
+    for(OpNode o : nodes){
+      LogicalOperator lo = o.getNodeValue();
+      if(classIdentifier.isAssignableFrom(lo.getClass())){
+        out.add( (T) lo);
+      }else{
+        throw new UnexpectedOperatorType(classIdentifier, lo, error);
+      }
+    }
+    return out;
+  }
+  
+  public class AdjacencyListBuilder implements OpVisitor{
+    Map<LogicalOperator, OpNode> ops = new HashMap<LogicalOperator, OpNode>();
+    
+    @Override
+    public void visit(LogicalOperator o) {
+      if(!ops.containsKey(o)){
+//        logger.debug("Adding node {}", o);
+        ops.put(o,  new OpNode(o));
+      }
+    }
+    
+    public AdjacencyList<OpNode> getAdjacencyList(){
+      AdjacencyList<OpNode> a = new AdjacencyList<OpNode>();
+      for(OpNode from : ops.values()){
+//        logger.debug("Adding edges for {}", from);
+        for(LogicalOperator t : from.getNodeValue()){
+//          logger.debug("\t\tAdding edges to {}", t);
+          OpNode to = ops.get(t);
+          a.addEdge(from, to, 0);
+        }
+        
+      }
+      return a;
+    }
+    
+  }
+  
+  public static class OpNode extends Node<LogicalOperator>{
+
+    public OpNode(LogicalOperator operator) {
+      super(operator);
+    }
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
new file mode 100644
index 0000000..b1190ad
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
@@ -0,0 +1,19 @@
+package org.apache.drill.common.logical;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+
+public class UnexpectedOperatorType extends ValidationError{
+
+  public UnexpectedOperatorType(String message){
+    super(message);
+  }
+  
+  public <A extends LogicalOperator, B extends LogicalOperator> UnexpectedOperatorType(Class<A> expected, B operator, String message) {
+    super(message + " Expected operator of type " + expected.getSimpleName() + " but received operator of type " + operator.getClass().getCanonicalName());
+  }
+
+
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/ValidationError.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/ValidationError.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/ValidationError.java
index dd6b9bd..e9f0374 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/ValidationError.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/ValidationError.java
@@ -1,5 +1,25 @@
 package org.apache.drill.common.logical;
 
-public class ValidationError {
+public class ValidationError extends RuntimeException{
+
+  public ValidationError() {
+    super();
+  }
+
+  public ValidationError(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public ValidationError(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ValidationError(String message) {
+    super(message);
+  }
+
+  public ValidationError(Throwable cause) {
+    super(cause);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Combine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Combine.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Combine.java
index a14eb3c..15ea1c2 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Combine.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Combine.java
@@ -1,12 +1,38 @@
 package org.apache.drill.common.logical.data;
 
+import org.apache.drill.common.expression.FieldReference;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 
 @JsonTypeName("combine")
 public class Combine extends SingleInputOperator{
+  private final FieldReference ref;
+
+  @JsonCreator
+  public Combine(@JsonProperty("ref") FieldReference ref) {
+    this.ref = ref;
+  }
+  
+  @JsonCreator
+  public Combine(@JsonProperty("input") LogicalOperator input, @JsonProperty("ref") FieldReference ref) {
+    this.ref = ref;
+    this.setInput(input);
+  }
+
+  @JsonProperty("ref")
+  public FieldReference getRef() {
+    return ref;
+  }
 
-	public String name;
-	
-	
+  @Override
+  public int getNestLevel() {
+    return super.getNestLevel()-1;
+  }
+  
+  
+  
+   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Explode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Explode.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Explode.java
index ac09a54..09c72c6 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Explode.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Explode.java
@@ -2,6 +2,7 @@ package org.apache.drill.common.logical.data;
 
 import org.apache.drill.common.expression.FieldReference;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
@@ -11,8 +12,21 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("explode")
 public class Explode extends SingleInputOperator{
 		
-	@JsonProperty("ref") public FieldReference reference;
-	
-	public Explode(){}
+	private final FieldReference ref;
+
+  @JsonCreator
+  public Explode(@JsonProperty(value="ref", required=true) FieldReference ref) {
+    this.ref = ref;
+  }
+  
+	@JsonCreator
+  public Explode(@JsonProperty("input") LogicalOperator input, @JsonProperty("ref") FieldReference ref) {
+    this.ref = ref;
+    this.setInput(input);
+  }
 	
+  @Override
+  public int getNestLevel() {
+    return super.getNestLevel()+1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Filter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Filter.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Filter.java
index 108782d..8d923fd 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Filter.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Filter.java
@@ -5,7 +5,6 @@ import org.apache.drill.common.expression.LogicalExpression;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("filter")
-public class Filter extends LogicalOperatorBase{
-	public int input;
+public class Filter extends SingleInputOperator{
 	public LogicalExpression expr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java
index d87e5f3..cba73e7 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Flatten.java
@@ -4,9 +4,11 @@ import java.util.List;
 
 import org.apache.drill.common.expression.LogicalExpression;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 
-public class Flatten {
-	@JsonProperty("input") public int inputNode;
+@JsonTypeName("flatten")
+public class Flatten extends SingleInputOperator{
+  
 	public List<LogicalExpression> exprs;
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Group.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Group.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Group.java
index ac1ea7b..b03a5b3 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Group.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Group.java
@@ -7,7 +7,13 @@ import org.apache.drill.common.expression.LogicalExpression;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("group")
-public class Group extends LogicalOperatorBase{
-	public int input;
+public class Group extends SingleInputOperator{
 	public List<LogicalExpression> exprs;
+
+  @Override
+  public int getNestLevel() {
+    return super.getNestLevel() + exprs.size();
+  }
+	
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
index 7187108..22c1183 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Join.java
@@ -3,19 +3,47 @@ package org.apache.drill.common.logical.data;
 import java.util.List;
 
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.OpVisitor;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("join")
 public class Join extends LogicalOperatorBase{
-	public int left;
-	public int right;
+	private LogicalOperator left;
+	private LogicalOperator right;
 	public List<JoinCondition> conditions;
 	
 	public static class JoinCondition{
 		public String relationship;
-		@JsonProperty("left-expr") public LogicalExpression leftExpression;
-		@JsonProperty("right-expr") public LogicalExpression rightExpression;
+		@JsonProperty("left") public LogicalExpression leftExpression;
+		@JsonProperty("right") public LogicalExpression rightExpression;
 	}
+
+  public LogicalOperator getLeft() {
+    return left;
+  }
+
+  public void setLeft(LogicalOperator left) {
+    this.left = left;
+    left.registerAsSubscriber(this);
+  }
+
+  public LogicalOperator getRight() {
+    return right;
+  }
+
+  public void setRight(LogicalOperator right) {
+    this.right = right;
+    right.registerAsSubscriber(this);
+  }
+
+  @Override
+  public int getNestLevel() {
+    return -5;
+  }
+
+	
+  
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
index 83e9c61..28f383a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
@@ -3,9 +3,11 @@ package org.apache.drill.common.logical.data;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.drill.common.expression.visitors.OpVisitor;
 import org.apache.drill.common.logical.ValidationError;
 
 import com.fasterxml.jackson.annotation.JsonIdentityInfo;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -14,11 +16,12 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="op")
 @JsonInclude(Include.NON_DEFAULT)
-public interface LogicalOperator {
+public interface LogicalOperator extends Iterable<LogicalOperator>{
 	
 	public static final Class<?>[] SUB_TYPES = {Sequence.class, Combine.class, Explode.class, Filter.class, Group.class, Join.class, Nest.class, Order.class, Project.class, Scan.class, Transform.class, Union.class};
 	
+	public void accept(OpVisitor visitor);
 	public void registerAsSubscriber(LogicalOperator operator);
 	public void setupAndValidate(List<LogicalOperator> operators, Collection<ValidationError> errors);
-
+	@JsonIgnore public int getNestLevel();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
index 5632e84..6ed1abd 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
@@ -2,12 +2,18 @@ package org.apache.drill.common.logical.data;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.expression.visitors.OpVisitor;
 import org.apache.drill.common.logical.ValidationError;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
 public abstract class LogicalOperatorBase implements LogicalOperator{
 	private List<LogicalOperator> children = new ArrayList<LogicalOperator>();
+	@JsonInclude(Include.NON_DEFAULT) public String memo;
 	
 	public LogicalOperatorBase(){}
 	
@@ -19,7 +25,28 @@ public abstract class LogicalOperatorBase implements LogicalOperator{
   public void registerAsSubscriber(LogicalOperator operator) {
     children.add(operator);
   }
-	
+
+  @Override
+  public void accept(OpVisitor visitor) {
+    visitor.visit(this);
+    for(LogicalOperator o : children){
+      visitor.visit(o);
+    }
+  }
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return children.iterator();
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + " [memo=" + memo + "]";
+  }
+
+  
+  
+ 
 	
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Nest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Nest.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Nest.java
index 534a562..208388c 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Nest.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Nest.java
@@ -2,16 +2,20 @@ package org.apache.drill.common.logical.data;
 
 import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("nest")
-public class Nest extends LogicalOperatorBase{
-  
-  
-	public LogicalOperator input;
-	public String name;
+public class Nest extends SingleInputOperator{
+	public FieldReference ref;
 	public List<LogicalExpression> exprs;
 	
+  @Override
+  public int getNestLevel() {
+    return super.getNestLevel()-1;
+  }
+	
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/OperatorListDeserializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/OperatorListDeserializer.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/OperatorListDeserializer.java
index a6cec8a..d6914f3 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/OperatorListDeserializer.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/OperatorListDeserializer.java
@@ -56,7 +56,7 @@ public class OperatorListDeserializer {
           LogicalOperator o = jp.readValueAs(LogicalOperator.class);
 
           if (pos == 0) {
-            if (!(o instanceof SingleInputOperator) && !(o instanceof ZeroInputOperator))
+            if (!(o instanceof SingleInputOperator) && !(o instanceof SourceOperator))
               throwE(
                   l,
                   "The first operator in a sequence must be either a ZeroInput or SingleInput operator.  The provided first operator was not. It was of type "
@@ -87,7 +87,7 @@ public class OperatorListDeserializer {
     if (first == null)
       throwE(start, "A sequence must include at least one operator.");
     if ((parent == null && first instanceof SingleInputOperator)
-        || (parent != null && first instanceof ZeroInputOperator))
+        || (parent != null && first instanceof SourceOperator))
       throwE(start,
           "A sequence must either start with a ZeroInputOperator or have a provided input. It cannot have both or neither.");
 
@@ -95,6 +95,7 @@ public class OperatorListDeserializer {
       ((SingleInputOperator) first).setInput(parent);
     }
 
+    
     return first;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
index 3680056..288a34e 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
@@ -8,12 +8,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("order")
-public class Order extends LogicalOperatorBase{
-	public int input;
+public class Order extends SingleInputOperator{
+
 	@JsonProperty("orders") public List<Ordering> orderings;
 	
 	public static class Ordering {
 		public String direction;
 		public LogicalExpression expr;
 	}
+	
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java
index 3943421..3702175 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java
@@ -2,12 +2,20 @@ package org.apache.drill.common.logical.data;
 
 import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("project")
-public class Project extends LogicalOperatorBase{
-	public int input;
-	public List<LogicalExpression>[] exprs;
+public class Project extends SinkOperator{
+  
+  @JsonProperty("exprs") public List<FieldSelection> selections;
+  
+  public static class FieldSelection {
+    public LogicalExpression expr;
+    public FieldReference ref;
+  }
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
index fb02159..7222a19 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
@@ -1,10 +1,19 @@
 package org.apache.drill.common.logical.data;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("scan")
-public class Scan extends LogicalOperatorBase{
-	public String source;
-	public String name;
-	public String table;
+public class Scan extends SourceOperator{
+	@JsonProperty("source") public String sourceName;
+
+	public String space;
+	
+	
+  @Override
+  public int getNestLevel() {
+    return 0;
+  }
+	
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Sequence.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Sequence.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Sequence.java
index 99814aa..46c2dcc 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Sequence.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Sequence.java
@@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.impl.ReadableObjectId;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
@@ -29,7 +30,7 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
  */
 @JsonDeserialize(using = De.class)
 @JsonTypeName("sequence")
-public class Sequence extends LogicalOperatorBase {
+public abstract class Sequence extends LogicalOperatorBase {
   static final Logger logger = LoggerFactory.getLogger(Sequence.class);
 
   public boolean openTop;
@@ -44,74 +45,107 @@ public class Sequence extends LogicalOperatorBase {
     }
 
     @Override
-    public LogicalOperator deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
-      
+    public LogicalOperator deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      ObjectIdGenerator<Integer> idGenerator = new ObjectIdGenerators.IntSequenceGenerator();
       JsonLocation start = jp.getCurrentLocation();
       JsonToken t = jp.getCurrentToken();
       LogicalOperator parent = null;
       LogicalOperator first = null;
       LogicalOperator prev = null;
-      
-      while(true){
-        String token = jp.getText();
-        logger.debug("Working on token '{}'", token);
-        jp.nextToken();
-        switch(token){ // switch on field names.
+      Integer id = null;
+
+      while (true) {
+        String fieldName = jp.getText();
+        // logger.debug("Working on field name '{}'", fieldName);
+        t = jp.nextToken();
+        switch (fieldName) { // switch on field names.
+        case "@id":
+          id = _parseIntPrimitive(jp, ctxt);
+          break;
         case "input":
           JavaType tp = ctxt.constructType(LogicalOperator.class);
           JsonDeserializer<Object> d = ctxt.findRootValueDeserializer(tp);
           parent = (LogicalOperator) d.deserialize(jp, ctxt);
           break;
+
         case "do":
-          if(!jp.isExpectedStartArrayToken()) throwE(jp, "The do parameter of sequence should be an array of SimpleOperators.  Expected a JsonToken.START_ARRAY token but received a " + t.name() + "token.");
-          
+          if (!jp.isExpectedStartArrayToken())
+            throwE(
+                jp,
+                "The do parameter of sequence should be an array of SimpleOperators.  Expected a JsonToken.START_ARRAY token but received a "
+                    + t.name() + "token.");
+
           int pos = 0;
-          while( (t = jp.nextToken()) != JsonToken.END_ARRAY){
-            logger.debug("Reading sequence child {}.", pos);
-            JsonLocation l = jp.getCurrentLocation(); // get current location first so we can correctly reference the start of the object in the case that the type is wrong.
+          while ((t = jp.nextToken()) != JsonToken.END_ARRAY) {
+            // logger.debug("Reading sequence child {}.", pos);
+            JsonLocation l = jp.getCurrentLocation(); // get current location
+                                                      // first so we can
+                                                      // correctly reference the
+                                                      // start of the object in
+                                                      // the case that the type
+                                                      // is wrong.
             LogicalOperator o = jp.readValueAs(LogicalOperator.class);
-            
-            if(pos == 0){
-              if( !(o instanceof SingleInputOperator) && !(o instanceof ZeroInputOperator)) throwE(l, "The first operator in a sequence must be either a ZeroInput or SingleInput operator.  The provided first operator was not. It was of type " + o.getClass().getName());
+
+            if (pos == 0) {
+              if (!(o instanceof SingleInputOperator) && !(o instanceof SourceOperator))
+                throwE(
+                    l,
+                    "The first operator in a sequence must be either a ZeroInput or SingleInput operator.  The provided first operator was not. It was of type "
+                        + o.getClass().getName());
               first = o;
-            }else{
-              if( !(o instanceof SingleInputOperator)) throwE(l, "All operators after the first must be single input operators.  The operator at position " + pos + " was not. It was of type " + o.getClass().getName());
+            } else {
+              if (!(o instanceof SingleInputOperator))
+                throwE(l, "All operators after the first must be single input operators.  The operator at position "
+                    + pos + " was not. It was of type " + o.getClass().getName());
               SingleInputOperator now = (SingleInputOperator) o;
               now.setInput(prev);
             }
             prev = o;
-            
+
             pos++;
           }
           break;
         default:
           throwE(jp, "Unknown field name provided for Sequence: " + jp.getText());
         }
-        
+
         t = jp.nextToken();
-        if(t == JsonToken.END_OBJECT){
+        if (t == JsonToken.END_OBJECT) {
           break;
         }
       }
-      
-      if(first == null) throwE(start, "A sequence must include at least one operator."); 
-      if( (parent == null && first instanceof SingleInputOperator) ||
-          (parent != null && first instanceof ZeroInputOperator) ) 
-        throwE(start, "A sequence must either start with a ZeroInputOperator or have a provided input. It cannot have both or neither.");
-      
-      if(parent != null && first instanceof SingleInputOperator){
+
+      if (first == null)
+        throwE(start, "A sequence must include at least one operator.");
+      if ((parent == null && first instanceof SingleInputOperator)
+          || (parent != null && first instanceof SourceOperator))
+        throwE(start,
+            "A sequence must either start with a ZeroInputOperator or have a provided input. It cannot have both or neither.");
+
+      if (parent != null && first instanceof SingleInputOperator) {
         ((SingleInputOperator) first).setInput(parent);
       }
-      
+
+      // set input reference.
+      if (id != null) {
+
+        ReadableObjectId rid = ctxt.findObjectId(id, idGenerator);
+        rid.bindItem(prev);
+        // logger.debug("Binding id {} to item {}.", rid.id, rid.item);
+
+      }
+
       return first;
     }
-    
-    
+
   }
-  private static void throwE(JsonLocation l, String e) throws JsonParseException{
+
+  private static void throwE(JsonLocation l, String e) throws JsonParseException {
     throw new JsonParseException(e, l);
   }
-  private static void throwE(JsonParser jp, String e) throws JsonParseException{
+
+  private static void throwE(JsonParser jp, String e) throws JsonParseException {
     throw new JsonParseException(e, jp.getCurrentLocation());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
index 3657622..1c76c1d 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
@@ -1,5 +1,7 @@
 package org.apache.drill.common.logical.data;
 
+import org.apache.drill.common.logical.UnexpectedOperatorType;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
@@ -7,11 +9,26 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  */
 public abstract class SingleInputOperator extends LogicalOperatorBase{
 
-  public LogicalOperator input;
+  private LogicalOperator input;
   
-  @JsonProperty
-  public void setInput(LogicalOperator input){
+  @JsonProperty("input")
+  public LogicalOperator getInput() {
+    return input;
+  }
+
+  @JsonProperty(value="input", required=true)
+  public void setInput(LogicalOperator input) {
+    if(input instanceof SinkOperator) throw new UnexpectedOperatorType("You have set the input of a sink node of type ["+input.getClass().getSimpleName()+ "] as the input for another node of type ["+this.getClass().getSimpleName()+ "].  This is invalid.");
     this.input = input;
     input.registerAsSubscriber(this);
   }
+
+  @Override
+  public int getNestLevel() {
+    return input.getNestLevel();
+  }
+  
+  
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SinkOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SinkOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SinkOperator.java
new file mode 100644
index 0000000..5dde7d4
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SinkOperator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.common.logical.data;
+
+
+/**
+ * An operator that cannot be subscribed to.
+ */
+public class SinkOperator extends SingleInputOperator{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SourceOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SourceOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SourceOperator.java
new file mode 100644
index 0000000..d3f9ee9
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SourceOperator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.common.logical.data;
+
+
+/**
+ * An operator that produces data without any parents.  (zero input operator)
+ */
+public abstract class SourceOperator extends LogicalOperatorBase{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Transform.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Transform.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Transform.java
index 84a4d8c..6f3d640 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Transform.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Transform.java
@@ -2,6 +2,7 @@ package org.apache.drill.common.logical.data;
 
 import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -12,8 +13,7 @@ public class Transform extends SingleInputOperator{
 	public List<Transformation> transforms;
 	
 	public static class Transformation{
-		public String name;
+		public FieldReference name;
 		public LogicalExpression expr;
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
index f1462fd..0d941cf 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java
@@ -1,9 +1,29 @@
 package org.apache.drill.common.logical.data;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("union")
-public class Union extends LogicalOperatorBase{
-	public int[] inputs;
-	public boolean dedupe;
+public class Union extends LogicalOperatorBase {
+  private LogicalOperator[] inputs;
+  public boolean dedupe;
+
+  @JsonProperty("inputs")
+  public void setInputs(LogicalOperator[] inputs) {
+    this.inputs = inputs;
+    for (LogicalOperator o : inputs) {
+      o.registerAsSubscriber(this);
+    }
+  }
+
+  public LogicalOperator[] getInputs() {
+    return inputs;
+  }
+
+  @Override
+  public int getNestLevel() {
+    throw new UnsupportedOperationException();
+  }
+
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/ZeroInputOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/ZeroInputOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/ZeroInputOperator.java
deleted file mode 100644
index 9678a52..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/ZeroInputOperator.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.drill.common.logical.data;
-
-public abstract class ZeroInputOperator extends LogicalOperatorBase{
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/AdjacencyList.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/AdjacencyList.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/AdjacencyList.java
new file mode 100644
index 0000000..256028b
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/AdjacencyList.java
@@ -0,0 +1,98 @@
+package org.apache.drill.common.logical.graph;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class AdjacencyList<N extends Node<?>> {
+  private Set<N> allNodes = new HashSet<N>();
+  private Map<N, List<Edge<N>>> adjacencies = new HashMap<N, List<Edge<N>>>();
+
+  public void addEdge(N source, N target, int weight) {
+    List<Edge<N>> list;
+    if (!adjacencies.containsKey(source)) {
+      list = new ArrayList<Edge<N>>();
+      adjacencies.put(source, list);
+    } else {
+      list = adjacencies.get(source);
+    }
+    list.add(new Edge<N>(source, target, weight));
+    allNodes.add(source);
+    allNodes.add(target);
+  }
+
+  public List<Edge<N>> getAdjacent(N source) {
+    return adjacencies.get(source);
+  }
+
+  public void reverseEdge(Edge<N> e) {
+    adjacencies.get(e.from).remove(e);
+    addEdge(e.to, e.from, e.weight);
+  }
+
+  public void reverseGraph() {
+    adjacencies = getReversedList().adjacencies;
+  }
+
+  public AdjacencyList<N> getReversedList() {
+    AdjacencyList<N> newlist = new AdjacencyList<N>();
+    for (List<Edge<N>> edges : adjacencies.values()) {
+      for (Edge<N> e : edges) {
+        newlist.addEdge(e.to, e.from, e.weight);
+      }
+    }
+    return newlist;
+  }
+
+  public Set<N> getNodeSet() {
+    return adjacencies.keySet();
+  }
+
+  /**
+   * Get a list of nodes that have no outbound edges.
+   * @return
+   */
+  public Collection<N> getTerminalNodes(){
+    // we have to use the allNodes list as otherwise destination only nodes won't be found.
+    List<N> nodes = new LinkedList<N>(allNodes);
+    
+    for(Iterator<N> i = nodes.iterator(); i.hasNext(); ){
+      final N n = i.next();
+      
+      // remove any nodes that have one or more outbound edges.
+      List<Edge<N>> adjList = this.getAdjacent(n);
+      if(adjList != null && !adjList.isEmpty()) i.remove();
+     
+    }
+    return nodes;
+  }
+  
+  /**
+   * Get a list of all nodes that have no incoming edges.
+   * @return
+   */
+  public Collection<N> getStartNodes(){
+    Set<N> nodes = new HashSet<N>(getNodeSet());
+    for(List<Edge<N>> le : adjacencies.values()){
+      for(Edge<N> e : le){
+        nodes.remove(e.to);
+      }
+    }
+    return nodes;
+  }
+  
+  public Collection<Edge<N>> getAllEdges() {
+    List<Edge<N>> edges = new LinkedList<Edge<N>>();
+    for (List<Edge<N>> e : adjacencies.values()) {
+      edges.addAll(e);
+    }
+    return edges;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Edge.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Edge.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Edge.java
new file mode 100644
index 0000000..f837b87
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Edge.java
@@ -0,0 +1,25 @@
+package org.apache.drill.common.logical.graph;
+
+
+public class Edge<N> implements Comparable<Edge<N>> {
+
+  final N from, to;
+  final int weight;
+
+  public Edge(final N argFrom, final N argTo, final int argWeight) {
+    from = argFrom;
+    to = argTo;
+    weight = argWeight;
+  }
+
+  public int compareTo(final Edge<N> argEdge) {
+    return weight - argEdge.weight;
+  }
+
+  @Override
+  public String toString() {
+    return "Edge [from=" + from + ", to=" + to + "]";
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/GraphAlgos.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/GraphAlgos.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/GraphAlgos.java
new file mode 100644
index 0000000..d51ab34
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/GraphAlgos.java
@@ -0,0 +1,122 @@
+package org.apache.drill.common.logical.graph;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GraphAlgos {
+  static final Logger logger = LoggerFactory.getLogger(GraphAlgos.class);
+
+  public static class TopoSorter<N extends Node<?>> {
+    final List<N> sorted = new LinkedList<N>();
+    final AdjacencyList<N> rGraph;
+
+    private TopoSorter(AdjacencyList<N> graph) {
+      this.rGraph = graph.getReversedList();
+      Collection<N> sourceNodes = rGraph.getStartNodes();
+
+      for (N n : graph.getNodeSet()) {
+        n.visited = false;
+      }
+      for (N n : sourceNodes) {
+        visit(n);
+      }
+    }
+
+    private void visit(N n) {
+      if (n.visited)
+        return;
+
+      n.visited = true;
+      List<Edge<N>> edges = rGraph.getAdjacent(n);
+      if (edges != null) {
+        for (Edge<N> e : edges) {
+          visit(e.to);
+        }
+      }
+
+      sorted.add(n);
+
+    }
+
+    /**
+     * Execute a depth-first sort on the reversed DAG.
+     * 
+     * @param graph
+     *          The adjacency list for the DAG.
+     * @param sourceNodes
+     *          List of nodes that
+     * @return
+     */
+    public static <N extends Node<?>> List<N> sort(AdjacencyList<N> graph) {
+      TopoSorter<N> ts = new TopoSorter<N>(graph);
+      return ts.sorted;
+    }
+  }
+
+  public static <N extends Node<?>> List<List<N>> checkDirected(AdjacencyList<N> graph) {
+    Tarjan<N> t = new Tarjan<N>();
+    List<List<N>> subgraphs = t.executeTarjan(graph);
+    for (Iterator<List<N>> i = subgraphs.iterator(); i.hasNext();) {
+      List<N> l = i.next();
+      if (l.size() == 1)
+        i.remove();
+    }
+    return subgraphs;
+  }
+
+  public static class Tarjan<N extends Node<?>> {
+
+    private int index = 0;
+    private List<N> stack = new LinkedList<N>();
+    private List<List<N>> SCC = new LinkedList<List<N>>();
+
+    public List<List<N>> executeTarjan(AdjacencyList<N> graph) {
+      SCC.clear();
+      index = 0;
+      stack.clear();
+      if (graph != null) {
+        List<N> nodeList = new LinkedList<N>(graph.getNodeSet());
+        for (N node : nodeList) {
+          if (node.index == -1) {
+            tarjan(node, graph);
+          }
+        }
+      }
+      return SCC;
+    }
+
+    private List<List<N>> tarjan(N v, AdjacencyList<N> list) {
+      v.index = index;
+      v.lowlink = index;
+      index++;
+      stack.add(0, v);
+      List<Edge<N>> l = list.getAdjacent(v);
+      if (l != null) {
+        for (Edge<N> e : l) {
+          N n = e.to;
+          if (n.index == -1) {
+            tarjan(n, list);
+            v.lowlink = Math.min(v.lowlink, n.lowlink);
+          } else if (stack.contains(n)) {
+            v.lowlink = Math.min(v.lowlink, n.index);
+          }
+        }
+      }
+      if (v.lowlink == v.index) {
+        N n;
+        List<N> component = new LinkedList<N>();
+        do {
+          n = stack.remove(0);
+          component.add(n);
+        } while (n != v);
+        SCC.add(component);
+      }
+      return SCC;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Node.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Node.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Node.java
new file mode 100644
index 0000000..ee2b314
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/graph/Node.java
@@ -0,0 +1,35 @@
+package org.apache.drill.common.logical.graph;
+
+
+public class Node<T> implements Comparable<Node<T>> {
+  final T nodeValue;
+  boolean visited = false; // used for Kosaraju's algorithm and Edmonds's
+                           // algorithm
+  int lowlink = -1; // used for Tarjan's algorithm
+  int index = -1; // used for Tarjan's algorithm
+
+  public Node(final T operator) {
+    this.nodeValue = operator;
+  }
+
+  public int compareTo(final Node<T> argNode) {
+    // just do an identity compare since elsewhere you should ensure that only one node exists for each nodeValue.
+    return argNode == this ? 0 : -1;
+  }
+  
+  @Override
+  public int hashCode() {
+    return nodeValue.hashCode(); 
+  }
+
+  public T getNodeValue(){
+    return nodeValue;
+  }
+
+  @Override
+  public String toString() {
+    return "Node [val=" + nodeValue + "]";
+  }
+
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/sources/DataSource.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/sources/DataSource.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/sources/DataSource.java
index 680e9ec..d36c061 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/sources/DataSource.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/sources/DataSource.java
@@ -2,11 +2,13 @@ package org.apache.drill.common.logical.sources;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
 public interface DataSource {
 	
 	public static final Class<?>[] SUB_TYPES = {Text.class, Mongo.class, Mysql.class};
 	
+
 	public String getName();
 	
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/eaa84d65/sandbox/prototype/common/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/simple_plan.json b/sandbox/prototype/common/src/test/resources/simple_plan.json
index a7678d3..b6a2d88 100644
--- a/sandbox/prototype/common/src/test/resources/simple_plan.json
+++ b/sandbox/prototype/common/src/test/resources/simple_plan.json
@@ -6,7 +6,6 @@
       type:"manual",
       info:"na"
     }
-    
   },
   sources:[
     {
@@ -21,7 +20,6 @@
         type:"first-row",
         delimiter:","
       }
-      
     },
     {
       type:"mongo",
@@ -33,18 +31,19 @@
       name:"mysql",
       connection:"jdbc:mysql://localhost/main"
     }
-    
   ],
   query:[
     {
       @id:"1",
       op:"scan",
+      memo:"initial_scan",
       source:"local-logs",
-      name:"activity"
+      space:"activity"
     },
     {
       @id:"2",
       input:"1",
+      memo:"transform1",
       op:"transform",
       transforms:[
         {
@@ -55,13 +54,12 @@
           name:"session",
           expr:"regex('activity.cookie', \"session=([^;]*)\")"
         }
-        
       ]
-      
     },
     {
       @id:"3",
       input:"2",
+      memo:"transform2",
       op:"transform",
       transforms:[
         {
@@ -72,42 +70,71 @@
           name:"session",
           expr:"regex('activity.cookie', \"session=([^;]*)\")"
         }
-        
       ]
-      
     },
     {
+      @id:"7",
       input:"3",
       op:"sequence",
       do:[
         {
           op:"transform",
+          memo:"seq_transform",
           transforms:[
             {
               name:"happy",
               expr:"regex('ep2', \"dink\")"
             }
-            
           ]
-          
         }
         ,
         {
           op:"transform",
+          memo:"last_transform",
           transforms:[
             {
               name:"abc",
-              expr:"2"
+              expr:"123"
             }
-            
           ]
-          
         }
-        
       ]
+    },
+    {
+      @id:"10",
+      input:"3",
+      op:"transform",
+      memo:"t3",
+      transforms:[
+        {
+          name:"happy",
+          expr:"regex('ep2', \"dink\")"
+        }
+      ]
+    },
+    {
+      @id:12,
+      op:"join",
+      left:"7",
+      right:"10"
       
     }
+    ,
+    {
+      input:"12",
+      op:"project",
+      memo:"prj1",
+      exprs:[
+        {
+          ref:"blue",
+          expr:"hello"
+        },
+        {
+          ref:"red",
+          expr:"users"
+        }
+      ]
+    }
     
   ]
-  
 }
\ No newline at end of file