You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:07 UTC

[3/9] git commit: Add flatten and join test executions. Abstract graph classes. Update storage engine definition to be a map. Move plan properties to use enum for plan type. Remove unused tests/resources. Update sql parser for change in storage engi

Add flatten and join test executions.  Abstract graph classes.  Update storage engine definition to be a map.  Move plan properties to use enum for plan type.  Remove unused tests/resources.  Update sql parser for change in storage engine definition.  Add basic physical plan implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2a6e1b33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2a6e1b33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2a6e1b33

Branch: refs/heads/execwork
Commit: 2a6e1b33e93824c1147e7e257a46aceb768da8d8
Parents: b48d0f0
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Mar 16 17:56:35 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Mar 16 18:26:42 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/common/JSONOptions.java  |  106 ++++++++
 .../org/apache/drill/common/PlanProperties.java    |   32 +++
 .../drill/common/config/CommonConstants.java       |    1 +
 .../apache/drill/common/config/DrillConfig.java    |    2 +
 .../org/apache/drill/common/defs/OrderDef.java     |   60 +++++
 .../org/apache/drill/common/defs/PartitionDef.java |   58 +++++
 .../drill/common/expression/types/DataType.java    |  104 +++++++-
 .../common/expression/types/LateBindType.java      |    2 +-
 .../common/expression/visitors/OpVisitor.java      |    6 +-
 .../apache/drill/common/graph/AdjacencyList.java   |  185 +++++++++++++++
 .../drill/common/graph/AdjacencyListBuilder.java   |   74 ++++++
 .../java/org/apache/drill/common/graph/Edge.java   |   42 ++++
 .../java/org/apache/drill/common/graph/Graph.java  |   68 ++++++
 .../org/apache/drill/common/graph/GraphAlgos.java  |  145 +++++++++++
 .../org/apache/drill/common/graph/GraphValue.java  |   26 ++
 .../apache/drill/common/graph/GraphVisitor.java    |   25 ++
 .../org/apache/drill/common/graph/Visitable.java   |   22 ++
 .../apache/drill/common/logical/JSONOptions.java   |  120 ----------
 .../apache/drill/common/logical/LogicalPlan.java   |  121 +++-------
 .../apache/drill/common/logical/OperatorGraph.java |  145 -----------
 .../drill/common/logical/PlanProperties.java       |   30 ---
 .../drill/common/logical/StorageEngineConfig.java  |    1 -
 .../common/logical/StorageEngineConfigBase.java    |    9 -
 .../common/logical/UnexpectedOperatorType.java     |    5 +-
 .../drill/common/logical/data/LogicalOperator.java |    7 +-
 .../common/logical/data/LogicalOperatorBase.java   |    4 +-
 .../apache/drill/common/logical/data/Order.java    |   49 +----
 .../org/apache/drill/common/logical/data/Scan.java |    2 +-
 .../apache/drill/common/logical/data/Store.java    |   49 +----
 .../apache/drill/common/logical/defs/OrderDef.java |   60 +++++
 .../drill/common/logical/defs/PartitionDef.java    |   55 +++++
 .../drill/common/logical/graph/AdjacencyList.java  |  123 ----------
 .../apache/drill/common/logical/graph/Edge.java    |   42 ----
 .../drill/common/logical/graph/GraphAlgos.java     |  137 -----------
 .../apache/drill/common/logical/graph/Node.java    |   53 ----
 .../org/apache/drill/common/physical/FieldSet.java |   97 ++++++++
 .../apache/drill/common/physical/POPConfig.java    |   24 ++
 .../org/apache/drill/common/physical/POPCost.java  |   34 +++
 .../apache/drill/common/physical/PhysicalPlan.java |   93 ++++++++
 .../apache/drill/common/physical/ReadEntry.java    |   25 ++
 .../apache/drill/common/physical/RecordField.java  |   79 ++++++
 .../org/apache/drill/common/physical/SetSpec.java  |   36 +++
 .../apache/drill/common/physical/StitchDef.java    |   48 ++++
 .../drill/common/physical/pop/ExchangePOP.java     |   56 +++++
 .../apache/drill/common/physical/pop/POPBase.java  |   65 +++++
 .../common/physical/pop/PhysicalOperator.java      |   35 +++
 .../common/physical/pop/QuickNWaySortPOP.java      |   50 ++++
 .../apache/drill/common/physical/pop/ScanPOP.java  |   75 ++++++
 .../drill/common/physical/pop/SingleChildPOP.java  |   41 ++++
 .../apache/drill/common/physical/pop/SinkPOP.java  |   22 ++
 .../drill/common/physical/pop/SourcePOP.java       |   22 ++
 .../apache/drill/common/physical/pop/StorePOP.java |   54 +++++
 .../drill/common/physical/props/OrderProp.java     |   45 ++++
 .../drill/common/physical/props/PartitionProp.java |   36 +++
 .../drill/common/physical/props/PhysicalProp.java  |   24 ++
 .../drill/common/physical/props/SegmentProp.java   |   42 ++++
 .../common/src/main/resources/drill-default.conf   |   17 +-
 .../test/java/org/apache/drill/ExpressionTest.java |   14 -
 .../drill/common/physical/ParsePhysicalPlan.java   |   37 +++
 .../drill/storage/MockStorageEngineConfig.java     |    4 +-
 .../common/src/test/resources/dsort-physical.json  |   76 ++++++
 .../common/src/test/resources/example1.sql         |  136 -----------
 .../common/src/test/resources/example2.sql         |   98 --------
 .../common/src/test/resources/example3.sql         |    3 -
 .../common/src/test/resources/logback.xml          |    9 +-
 .../common/src/test/resources/logical_plan1.json   |  139 -----------
 .../common/src/test/resources/simple_plan.json     |  134 -----------
 .../src/test/resources/storage_engine_plan.json    |    9 +-
 sandbox/prototype/exec/ref/pom.xml                 |    8 +-
 .../org/apache/drill/exec/ref/rops/OrderROP.java   |    6 +-
 .../apache/drill/exec/ref/rse/ClasspathRSE.java    |    6 +-
 .../org/apache/drill/exec/ref/rse/ConsoleRSE.java  |   22 +-
 .../apache/drill/exec/ref/rse/FileSystemRSE.java   |    8 +-
 .../org/apache/drill/exec/ref/rse/QueueRSE.java    |   12 +-
 .../org/apache/drill/exec/ref/RunSimplePlan.java   |   24 ++
 .../exec/ref/src/test/resources/simple_join.json   |   24 +--
 .../exec/ref/src/test/resources/simple_plan.json   |   24 +--
 .../src/test/resources/simple_plan_flattened.json  |   28 +-
 .../main/java/org/apache/drill/jdbc/Driver.java    |    2 +-
 .../org/apache/drill/optiq/DrillImplementor.java   |   10 +-
 .../java/org/apache/drill/optiq/DrillScan.java     |    3 +-
 81 files changed, 2330 insertions(+), 1496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
new file mode 100644
index 0000000..ad4926b
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common;
+
+import java.io.IOException;
+
+import org.apache.drill.common.JSONOptions.De;
+import org.apache.drill.common.JSONOptions.Se;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.LogicalPlanParsingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonLocation;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+@JsonSerialize(using = Se.class)
+@JsonDeserialize(using = De.class)
+public class JSONOptions {
+  
+  final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
+  
+  private JsonNode root;
+  private JsonLocation location;
+  
+  private JSONOptions(JsonNode n, JsonLocation location){
+    this.root = n;
+    this.location = location;
+  }
+  
+  public <T> T getWith(DrillConfig config, Class<T> c){
+    try {
+      //logger.debug("Read tree {}", root);
+      return config.getMapper().treeToValue(root, c);
+    } catch (JsonProcessingException e) {
+      throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound json options to type of %s. Reference was originally located at line %d, column %d.", c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
+    }
+  }
+  
+  public JsonNode path(String name){
+    return root.path(name);
+  }
+  
+  public static class De extends StdDeserializer<JSONOptions> {
+    
+    public De() {
+      super(JSONOptions.class);
+      logger.debug("Creating Deserializer.");
+    }
+
+    @Override
+    public JSONOptions deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      JsonLocation l = jp.getTokenLocation();
+//      logger.debug("Reading tree.");
+      TreeNode n = jp.readValueAsTree();
+//      logger.debug("Tree {}", n);
+      if(n instanceof JsonNode){
+        return new JSONOptions( (JsonNode) n, l); 
+      }else{
+        throw new IllegalArgumentException(String.format("Received something other than a JsonNode %s", n));
+      }
+    }
+
+  }
+
+  public static class Se extends StdSerializer<JSONOptions> {
+
+    public Se() {
+      super(JSONOptions.class);
+    }
+
+    @Override
+    public void serialize(JSONOptions value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+        JsonGenerationException {
+      jgen.writeTree(value.root);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
new file mode 100644
index 0000000..57d367a
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common;
+
+
+public class PlanProperties {
+  public static enum PlanType {APACHE_DRILL_LOGICAL, APACHE_DRILL_PHYSICAL}
+
+  public PlanType type;
+  public int version;
+	public Generator generator = new Generator();
+	
+	public static class Generator{
+		public String type;
+		public String info;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/CommonConstants.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
index 0687979..95bc979 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
@@ -22,6 +22,7 @@ public interface CommonConstants {
   public static final String CONFIG_OVERRIDE = "drill-override.conf";
   
   public static final String LOGICAL_OPERATOR_SCAN_PACKAGES = "drill.logical.operator.packages";
+  public static final String PHYSICAL_OPERATOR_SCAN_PACKAGES = "drill.physical.operator.packages";
   public static final String STORAGE_ENGINE_CONFIG_SCAN_PACKAGES = "drill.logical.storage.packages";
   public static final String DRILL_JAR_MARKER_FILE = "drill-module.conf";
   public static final String LOGICAL_FUNCTION_SCAN_PACKAGES = "drill.logical.function.packages";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 460b5fa..a775867 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.StorageEngineConfigBase;
 import org.apache.drill.common.logical.data.LogicalOperatorBase;
+import org.apache.drill.common.physical.pop.POPBase;
 import org.apache.drill.common.util.PathScanner;
 
 import com.fasterxml.jackson.core.JsonParser.Feature;
@@ -52,6 +53,7 @@ public final class DrillConfig extends NestedConfig{
     mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
     mapper.configure(Feature.ALLOW_COMMENTS, true);
     mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
+    mapper.registerSubtypes(POPBase.getSubTypes(this));
     mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));
     
   };

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/OrderDef.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/OrderDef.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/OrderDef.java
new file mode 100644
index 0000000..e873496
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/OrderDef.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.defs;
+
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OrderDef {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderDef.class);
+
+  private final Direction direction;
+  private final LogicalExpression expr;
+
+  @JsonCreator
+  public OrderDef(@JsonProperty("order") Direction direction, @JsonProperty("expr") LogicalExpression expr) {
+    this.expr = expr;
+    // default to ascending unless desc is provided.
+    this.direction = direction == null ? Direction.ASC : direction;
+  }
+  
+  @JsonIgnore
+  public Direction getDirection() {
+    return direction;
+  }
+
+  public LogicalExpression getExpr() {
+    return expr;
+  }
+
+  public String getOrder() {
+    return direction.description;
+  }
+
+  public static enum Direction {
+    ASC("asc"), DESC("desc");
+    public final String description;
+
+    Direction(String d) {
+      description = d;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
new file mode 100644
index 0000000..181c327
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.defs;
+
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class PartitionDef {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionDef.class);
+
+  private final PartitionType partitionType;
+  private final LogicalExpression[] expressions;
+  private final LogicalExpression[] starts;
+  
+  @JsonCreator
+  public PartitionDef(@JsonProperty("mode") PartitionType partitionType, @JsonProperty("exprs") LogicalExpression[] expressions, @JsonProperty("starts") LogicalExpression[] starts) {
+    this.partitionType = partitionType;
+    this.expressions = expressions;
+    this.starts = starts;
+  }
+
+  @JsonProperty("mode")
+  public PartitionType getPartitionType() {
+    return partitionType;
+  }
+
+  @JsonProperty("exprs")
+  public LogicalExpression[] getExpressions() {
+    return expressions;
+  }
+
+  @JsonProperty("starts")
+  public LogicalExpression[] getStarts() {
+    return starts;
+  }
+  
+
+  public static enum PartitionType{ 
+    RANDOM, HASH, RANGE;
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
index ac9aec7..776a9e8 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
@@ -17,7 +17,28 @@
  ******************************************************************************/
 package org.apache.drill.common.expression.types;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+@JsonSerialize(using = DataType.Se.class)
+@JsonDeserialize(using = DataType.De.class)
 public abstract class DataType {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataType.class);
   
   public static enum Comparability{
     UNKNOWN, NONE, EQUAL, ORDERED;
@@ -30,21 +51,78 @@ public abstract class DataType {
   public abstract Comparability getComparability();
   public abstract boolean isNumericType();
   
-  
   public static final DataType LATEBIND = new LateBindType();
-  public static final DataType BOOLEAN = new AtomType("boolean", Comparability.EQUAL, false);
-  public static final DataType BYTES = new AtomType("bytes", Comparability.ORDERED, false);
-  public static final DataType NVARCHAR = new AtomType("varchar", Comparability.ORDERED, false);
-  public static final DataType FLOAT32 = new AtomType("float32", Comparability.ORDERED, true);
-  public static final DataType FLOAT64 = new AtomType("float64", Comparability.ORDERED, true);
-  public static final DataType INT64 = new AtomType("int64", Comparability.ORDERED, true);
-  public static final DataType INT32 = new AtomType("int32", Comparability.ORDERED, true);
+  public static final DataType BOOLEAN = new AtomType("BOOLEAN", Comparability.EQUAL, false);
+  public static final DataType BYTES = new AtomType("BYTES", Comparability.ORDERED, false);
+  public static final DataType NVARCHAR = new AtomType("VARCHAR", Comparability.ORDERED, false);
+  public static final DataType FLOAT32 = new AtomType("FLOAT32", Comparability.ORDERED, true);
+  public static final DataType FLOAT64 = new AtomType("FLOAT64", Comparability.ORDERED, true);
+  public static final DataType INT64 = new AtomType("INT64", Comparability.ORDERED, true);
+  public static final DataType INT32 = new AtomType("INT32", Comparability.ORDERED, true);
 //  public static final DataType INT16 = new AtomType("int16", Comparability.ORDERED, true);
 //  public static final DataType BIG_INTEGER = new AtomType("bigint", Comparability.ORDERED, true);
 //  public static final DataType BIG_DECIMAL = new AtomType("bigdecimal", Comparability.ORDERED, true);
-  public static final DataType DATE = new AtomType("date", Comparability.ORDERED, false);
-  public static final DataType DATETIME = new AtomType("datetime", Comparability.ORDERED, false);
-  public static final DataType MAP = new AtomType("map", Comparability.NONE, false);
-  public static final DataType ARRAY = new AtomType("array", Comparability.NONE, false);
-  public static final DataType NULL = new AtomType("null", Comparability.NONE, false);
+  public static final DataType DATE = new AtomType("DATE", Comparability.ORDERED, false);
+  public static final DataType DATETIME = new AtomType("DATETIME", Comparability.ORDERED, false);
+  public static final DataType MAP = new AtomType("MAP", Comparability.NONE, false);
+  public static final DataType ARRAY = new AtomType("ARRAY", Comparability.NONE, false);
+  public static final DataType NULL = new AtomType("NULL", Comparability.NONE, false);
+  
+  
+  static final Map<String, DataType> TYPES;
+  static {
+    Field[] fields = DataType.class.getFields();
+    Map<String, DataType> types = new HashMap<String, DataType>();
+    for(Field f : fields){
+      //logger.debug("Reviewing {}, Field: {}", f.getClass(), f);
+      if(Modifier.isStatic(f.getModifiers())){
+        try {
+          Object o = f.get(null);
+          //logger.debug("Object {}", o);
+          
+          if(o instanceof DataType) types.put(((DataType) o).getName(), (DataType) o);
+        } catch (IllegalArgumentException | IllegalAccessException e) {
+          logger.warn("Failure while reading DataType.", e);
+        }
+      }
+    }
+    TYPES = Collections.unmodifiableMap(types);
+    
+  }
+  
+  public static DataType getDataType(String name){
+    if(TYPES.containsKey(name)){
+      return TYPES.get(name);
+    }else{
+      throw new IllegalArgumentException(String.format("Unknown type requested of [%s].", name));
+    }
+  }
+
+  public static class De extends StdDeserializer<DataType> {
+
+    public De() {
+      super(DataType.class);
+    }
+
+    @Override
+    public DataType deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      return getDataType(this._parseString(jp, ctxt));
+    }
+
+  }
+
+  public static class Se extends StdSerializer<DataType> {
+
+    public Se() {
+      super(DataType.class);
+    }
+
+    @Override
+    public void serialize(DataType value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+        JsonGenerationException {
+      jgen.writeString(value.getName());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/LateBindType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/LateBindType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/LateBindType.java
index 8938e28..da3d87a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/LateBindType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/LateBindType.java
@@ -21,7 +21,7 @@ class LateBindType extends DataType {
 
   @Override
   public String getName() {
-    return "late";
+    return "LATE";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
index 8dd7e16..983a258 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/visitors/OpVisitor.java
@@ -17,10 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.common.expression.visitors;
 
+import org.apache.drill.common.graph.GraphVisitor;
 import org.apache.drill.common.logical.data.LogicalOperator;
 
-public interface OpVisitor {
-  public boolean enter(LogicalOperator o);
-  public void leave(LogicalOperator o);
-  public boolean visit(LogicalOperator o);
+public interface OpVisitor extends GraphVisitor<LogicalOperator> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyList.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyList.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyList.java
new file mode 100644
index 0000000..f23eeca
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyList.java
@@ -0,0 +1,185 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+
+class AdjacencyList<V extends GraphValue<V>> {
+  private Set<Node> allNodes = new HashSet<Node>();
+  private ListMultimap<Node, Edge<Node>> adjacencies = ArrayListMultimap.create();
+
+  void addEdge(Node source, Node target, int weight) {
+    adjacencies.put(source, new Edge<Node>(source, target, weight));
+    allNodes.add(source);
+    allNodes.add(target);
+  }
+
+  void clearVisited() {
+    for (Edge<Node> e : adjacencies.values()) {
+      e.from.visited = false;
+      e.to.visited = false;
+    }
+  }
+
+  Node getNewNode(V value){
+    return new Node(value);
+  }
+  
+  public List<Edge<Node>> getAdjacent(AdjacencyList<V>.Node source) {
+    return adjacencies.get(source);
+  }
+
+  public void printEdges() {
+    for (Edge<Node> e : adjacencies.values()) {
+      System.out.println(e.from.index + " -> " + e.to.index);
+    }
+  }
+
+  public AdjacencyList<V> getReversedList() {
+    AdjacencyList<V> newlist = new AdjacencyList<V>();
+    for (Edge<Node> e : adjacencies.values()) {
+      newlist.addEdge(e.to, e.from, e.weight);
+    }
+    return newlist;
+  }
+
+  public Set<Node> getNodeSet() {
+    return adjacencies.keySet();
+  }
+
+
+  Collection<Node> getInternalLeafNodes() {
+    // we have to use the allNodes list as otherwise destination only nodes won't be found.
+    List<Node> nodes = new LinkedList<Node>(allNodes);
+
+    for (Iterator<Node> i = nodes.iterator(); i.hasNext();) {
+      final Node n = i.next();
+
+      // remove any nodes that have one or more outbound edges.
+      List<Edge<Node>> adjList = this.getAdjacent(n);
+      if (adjList != null && !adjList.isEmpty()) i.remove();
+
+    }
+    return nodes;
+  }
+  
+  /**
+   * Get a list of nodes that have no outbound edges.
+   * 
+   * @return
+   */
+  public Collection<V> getLeafNodes(){
+    return convert(getInternalLeafNodes());
+  }
+
+
+  Collection<Node> getInternalRootNodes() {
+    Set<Node> nodes = new HashSet<Node>(getNodeSet());
+    for (Edge<Node> e : adjacencies.values()) {
+      nodes.remove(e.to);
+    }
+    return nodes;
+  }
+
+  /**
+   * Get a list of all nodes that have no incoming edges.
+   * 
+   * @return
+   */
+  public List<V> getRootNodes(){
+    return convert(getInternalRootNodes());
+  }
+  
+  public Collection<Edge<Node>> getAllEdges() {
+    return adjacencies.values();
+  }
+
+  public void fix(boolean requireDirected) {
+    adjacencies = Multimaps.unmodifiableListMultimap(adjacencies);
+    allNodes = Collections.unmodifiableSet(allNodes);
+
+    if (requireDirected) {
+      List<List<Node>> cyclicReferences = GraphAlgos.checkDirected(this);
+      if (cyclicReferences.size() > 0) {
+        throw new IllegalArgumentException(
+            "A logical plan must be a valid DAG.  You have cyclic references in your graph.  " + cyclicReferences);
+      }
+    }
+  }
+
+  List<V> convert(Collection<Node> nodes) {
+    List<V> out = new ArrayList<V>(nodes.size());
+    for (Node o : nodes) {
+      out.add(o.getNodeValue());
+    }
+    return out;
+  }
+
+  class Node implements Comparable<Node> {
+    final V nodeValue;
+    boolean visited = false; // used for Kosaraju's algorithm and Edmonds's
+                             // algorithm
+    int lowlink = -1; // used for Tarjan's algorithm
+    int index = -1; // used for Tarjan's algorithm
+
+    public Node(final V operator) {
+      if (operator == null) throw new IllegalArgumentException("Operator node was null.");
+      this.nodeValue = operator;
+    }
+
+    public int compareTo(final Node argNode) {
+      // just do an identity compare since elsewhere you should ensure that only one node exists for each nodeValue.
+      return argNode == this ? 0 : -1;
+    }
+
+    @Override
+    public int hashCode() {
+      return nodeValue.hashCode();
+    }
+
+    public V getNodeValue() {
+      return nodeValue;
+    }
+
+    @Override
+    public String toString() {
+      return "Node [val=" + nodeValue + "]";
+    }
+
+  }
+  
+  public static <V extends GraphValue<V>> AdjacencyList<V> newInstance(Collection<V> nodes){
+    AdjacencyList<V> list = new AdjacencyList<V>();
+    AdjacencyListBuilder<V> builder = new AdjacencyListBuilder<V>(list);
+    for(V v : nodes){
+      v.accept(builder); 
+    }
+    return builder.getAdjacencyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
new file mode 100644
index 0000000..1668477
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+
+ class AdjacencyListBuilder<V extends GraphValue<V>> implements GraphVisitor<V> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AdjacencyListBuilder.class);
+
+  private Map<V, AdjacencyList<V>.Node> ops = new HashMap<V, AdjacencyList<V>.Node>();
+  private final AdjacencyList<V> parent;
+  
+  public AdjacencyListBuilder(AdjacencyList<V> parent) {
+    this.parent = parent;
+  }
+
+
+  protected boolean requireDirected() {
+    return true;
+  }
+
+  public boolean enter(V o) {
+    visit(o);
+    return true;
+  }
+
+  @Override
+  public void leave(V o) {
+  }
+
+  @Override
+  public boolean visit(V o) {
+    if (o == null) throw new IllegalArgumentException("Null operator.");
+
+    if (!ops.containsKey(o)) {
+      ops.put(o, parent.getNewNode(o));
+      return true;
+    }
+
+    return true;
+  }
+
+  public AdjacencyList<V> getAdjacencyList() {
+    logger.debug("Values; {}", ops.values().toArray());
+    AdjacencyList<V> a = new AdjacencyList<V>();
+
+    for (AdjacencyList<V>.Node from : ops.values()) {
+      for (V t : from.getNodeValue()) {
+        AdjacencyList<V>.Node to = ops.get(t);
+        a.addEdge(from, to, 0);
+      }
+
+    }
+    a.fix(true);
+    return a;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Edge.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Edge.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Edge.java
new file mode 100644
index 0000000..d444b9b
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Edge.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+
+class Edge<N> implements Comparable<Edge<N>> {
+
+  final N from, to;
+  final int weight;
+
+  public Edge(final N argFrom, final N argTo, final int argWeight) {
+    from = argFrom;
+    to = argTo;
+    weight = argWeight;
+  }
+
+  public int compareTo(final Edge<N> argEdge) {
+    return weight - argEdge.weight;
+  }
+
+  @Override
+  public String toString() {
+    return "Edge [from=" + from + ", to=" + to + "]";
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Graph.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Graph.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Graph.java
new file mode 100644
index 0000000..295a81a
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Graph.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.logical.UnexpectedOperatorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Graph<G extends GraphValue<G>, R extends G, T extends G> {
+
+  static final Logger logger = LoggerFactory.getLogger(Graph.class);
+
+  private AdjacencyList<G> adjList;
+  private final List<R> roots;
+  private final List<T> leaves;
+
+  public Graph(List<G> operators, Class<R> root, Class<T> leaf) {
+    adjList = AdjacencyList.newInstance(operators);
+    roots = checkOperatorType(adjList.getRootNodes(), root, String.format("Root nodes must be a subclass of %s.", root.getSimpleName()));
+    leaves = checkOperatorType(adjList.getLeafNodes(), leaf, String.format("Leaf nodes must be a subclass of %s.", leaf.getSimpleName()));
+  }
+
+  @SuppressWarnings("unchecked")
+  private <O extends G> List<O> checkOperatorType(Collection<G> ops, Class<O> classIdentifier, String error){
+    for(G o : ops){
+      if(!classIdentifier.isAssignableFrom(o.getClass())){
+        throw new UnexpectedOperatorType(o, error);
+      }
+    }
+    return (List<O>) ops;
+  }
+  
+  public AdjacencyList<G> getAdjList() {
+    return adjList;
+  }
+
+  public Collection<R> getRoots() {
+    return roots;
+  }
+
+  public Collection<T> getLeaves() {
+    return leaves;
+  }
+
+  public static <G extends GraphValue<G>, R extends G, T extends G> Graph<G, R, T> newGraph(List<G> operators, Class<R> root, Class<T> leaf){
+    return new Graph<G, R, T>(operators, root, leaf);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphAlgos.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphAlgos.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphAlgos.java
new file mode 100644
index 0000000..b83f1bd
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphAlgos.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GraphAlgos {
+  static final Logger logger = LoggerFactory.getLogger(GraphAlgos.class);
+
+  public static class TopoSorter<V extends GraphValue<V>> {
+    final List<AdjacencyList<V>.Node> sorted = new LinkedList<AdjacencyList<V>.Node>();
+    final AdjacencyList<V> rGraph;
+
+    private TopoSorter(AdjacencyList<V> graph) {
+      graph.clearVisited();
+
+      this.rGraph = graph.getReversedList();
+      Collection<AdjacencyList<V>.Node> sourceNodes = rGraph.getInternalRootNodes();
+
+      for (AdjacencyList<V>.Node n : sourceNodes) {
+        visit(n);
+      }
+    }
+
+    private void visit(AdjacencyList<V>.Node n) {
+      if (n.visited) return;
+
+      n.visited = true;
+      List<Edge<AdjacencyList<V>.Node>> edges = rGraph.getAdjacent(n);
+      if (edges != null) {
+        for (Edge<AdjacencyList<V>.Node> e : edges) {
+          visit(e.to);
+        }
+      }
+
+      sorted.add(n);
+
+    }
+
+    /**
+     * Execute a depth-first sort on the reversed DAG.
+     * 
+     * @param graph
+     *          The adjacency list for the DAG.
+     * @param sourceNodes
+     *          List of nodes that
+     * @return
+     */
+    static <V extends GraphValue<V>> List<AdjacencyList<V>.Node> sortInternal(AdjacencyList<V> graph) {
+      TopoSorter<V> ts = new TopoSorter<V>(graph);
+      return ts.sorted;
+    }
+
+    public static <V extends GraphValue<V>> List<V> sort(Graph<V, ?, ?> graph) {
+      AdjacencyList<V> l = graph.getAdjList();
+      return l.convert(sortInternal(l));
+    }
+  }
+
+  static <V extends GraphValue<V>> List<List<AdjacencyList<V>.Node>> checkDirected(AdjacencyList<V> graph) {
+    Tarjan<V> t = new Tarjan<V>();
+    List<List<AdjacencyList<V>.Node>> subgraphs = t.executeTarjan(graph);
+    for (Iterator<List<AdjacencyList<V>.Node>> i = subgraphs.iterator(); i.hasNext();) {
+      List<AdjacencyList<V>.Node> l = i.next();
+      if (l.size() == 1) i.remove();
+    }
+    return subgraphs;
+  }
+
+  public static <V extends GraphValue<V>> List<List<AdjacencyList<V>.Node>> checkDirected(Graph<V, ?, ?> graph) {
+    return checkDirected(graph.getAdjList());
+  }
+
+  public static class Tarjan<V extends GraphValue<V>> {
+
+    private int index = 0;
+    private List<AdjacencyList<V>.Node> stack = new LinkedList<AdjacencyList<V>.Node>();
+    private List<List<AdjacencyList<V>.Node>> SCC = new LinkedList<List<AdjacencyList<V>.Node>>();
+
+    public List<List<AdjacencyList<V>.Node>> executeTarjan(AdjacencyList<V> graph) {
+      SCC.clear();
+      index = 0;
+      stack.clear();
+      if (graph != null) {
+        List<AdjacencyList<V>.Node> nodeList = new LinkedList<AdjacencyList<V>.Node>(graph.getNodeSet());
+        for (AdjacencyList<V>.Node node : nodeList) {
+          if (node.index == -1) {
+            tarjan(node, graph);
+          }
+        }
+      }
+      return SCC;
+    }
+
+    private List<List<AdjacencyList<V>.Node>> tarjan(AdjacencyList<V>.Node v, AdjacencyList<V> list) {
+      v.index = index;
+      v.lowlink = index;
+      index++;
+      stack.add(0, v);
+      List<Edge<AdjacencyList<V>.Node>> l = list.getAdjacent(v);
+      if (l != null) {
+        for (Edge<AdjacencyList<V>.Node> e : l) {
+          AdjacencyList<V>.Node n = e.to;
+          if (n.index == -1) {
+            tarjan(n, list);
+            v.lowlink = Math.min(v.lowlink, n.lowlink);
+          } else if (stack.contains(n)) {
+            v.lowlink = Math.min(v.lowlink, n.index);
+          }
+        }
+      }
+      if (v.lowlink == v.index) {
+        AdjacencyList<V>.Node n;
+        List<AdjacencyList<V>.Node> component = new LinkedList<AdjacencyList<V>.Node>();
+        do {
+          n = stack.remove(0);
+          component.add(n);
+        } while (n != v);
+        SCC.add(component);
+      }
+      return SCC;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphValue.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphValue.java
new file mode 100644
index 0000000..b1eeede
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphValue.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+
+public interface GraphValue<T> extends Iterable<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GraphValue.class);
+  
+  public void accept(GraphVisitor<T> visitor);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphVisitor.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphVisitor.java
new file mode 100644
index 0000000..5ecdaea
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/GraphVisitor.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+
+public interface GraphVisitor<T> {
+  public boolean enter(T o);
+  public void leave(T o);
+  public boolean visit(T o);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Visitable.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Visitable.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Visitable.java
new file mode 100644
index 0000000..90ee1d1
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/Visitable.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.graph;
+
+public interface Visitable<T extends Visitable<?>> {
+  public void accept(T node);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/JSONOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/JSONOptions.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/JSONOptions.java
deleted file mode 100644
index d4d97f6..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/JSONOptions.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.logical;
-
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.LogicalPlanParsingException;
-import org.apache.drill.common.logical.JSONOptions.De;
-import org.apache.drill.common.logical.JSONOptions.Se;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonLocation;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonParser.Feature;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-@JsonSerialize(using = Se.class)
-@JsonDeserialize(using = De.class)
-public class JSONOptions {
-  private static volatile ObjectMapper MAPPER;
-  
-  final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
-  
-  private JsonNode root;
-  private JsonLocation location;
-  
-  private JSONOptions(JsonNode n, JsonLocation location){
-    this.root = n;
-    this.location = location;
-  }
-  
-  public <T> T getWith(Class<T> c){
-    try {
-      //logger.debug("Read tree {}", root);
-      return getMapper().treeToValue(root, c);
-    } catch (JsonProcessingException e) {
-      throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound json options to type of %s. Reference was originally located at line %d, column %d.", c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
-    }
-  }
-  
-  public JsonNode path(String name){
-    return root.path(name);
-  }
-  
-  private static synchronized ObjectMapper getMapper(){
-    if(MAPPER == null){
-      ObjectMapper mapper = new ObjectMapper();
-      mapper.enable(SerializationFeature.INDENT_OUTPUT);
-      mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
-      mapper.configure(Feature.ALLOW_COMMENTS, true);
-      MAPPER = mapper;
-    }
-    return MAPPER;
-  }
-  
-  public static class De extends StdDeserializer<JSONOptions> {
-    
-    public De() {
-      super(JSONOptions.class);
-      logger.debug("Creating Deserializer.");
-    }
-
-    @Override
-    public JSONOptions deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
-        JsonProcessingException {
-      JsonLocation l = jp.getTokenLocation();
-      //logger.debug("Reading tree.");
-      TreeNode n = jp.readValueAsTree();
-      if(n instanceof JsonNode){
-        return new JSONOptions( (JsonNode) n, l); 
-      }else{
-        ctxt.mappingException("Failure reading json options as JSON value.");
-        return null;
-      }
-    }
-
-  }
-
-  public static class Se extends StdSerializer<JSONOptions> {
-
-    public Se() {
-      super(JSONOptions.class);
-    }
-
-    @Override
-    public void serialize(JSONOptions value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
-        JsonGenerationException {
-      jgen.writeTree(value.root);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 74f7ee9..a2f2499 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -19,17 +19,16 @@ package org.apache.drill.common.logical;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.PlanProperties;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.LogicalPlanParsingException;
-import org.apache.drill.common.logical.OperatorGraph.OpNode;
+import org.apache.drill.common.graph.Graph;
+import org.apache.drill.common.graph.GraphAlgos;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.graph.GraphAlgos;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.SourceOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,94 +39,51 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
-@JsonPropertyOrder({"head", "storage", "query"})
+@JsonPropertyOrder({ "head", "storage", "query" })
 public class LogicalPlan {
   static final Logger logger = LoggerFactory.getLogger(LogicalPlan.class);
-  
-	private final PlanProperties properties;
-	private final Map<String, StorageEngineConfig> storageEngines;
-	private final OperatorGraph graph;
-	
-	private static volatile ObjectMapper MAPPER;
-	
-	@SuppressWarnings("unchecked")
+
+  private final PlanProperties properties;
+  private final Map<String, StorageEngineConfig> storageEngineMap;
+  private final Graph<LogicalOperator, SourceOperator, SinkOperator> graph;
+
   @JsonCreator
-	public LogicalPlan(@JsonProperty("head") PlanProperties head, @JsonProperty("storage") List<StorageEngineConfig> storageEngines, @JsonProperty("query") List<LogicalOperator> operators){
-	  if(storageEngines == null) storageEngines = Collections.EMPTY_LIST;
-	  this.properties = head;
-	  this.storageEngines = new HashMap<String, StorageEngineConfig>(storageEngines.size());
-    for(StorageEngineConfig store: storageEngines){
-      StorageEngineConfig old = this.storageEngines.put(store.getName(), store);
-      if(old != null) throw new LogicalPlanParsingException(String.format("Each storage engine must have a unique name.  You provided more than one data source with the same name of '%s'", store.getName()));
-    }
-    
-    this.graph = new OperatorGraph(operators);
-	}
-	
-	@JsonProperty("query")
-	public List<LogicalOperator> getSortedOperators(){
-	  List<OpNode> nodes = GraphAlgos.TopoSorter.sort(graph.getAdjList());
-	  Iterable<LogicalOperator> i = Iterables.transform(nodes, new Function<OpNode, LogicalOperator>(){
-	    public LogicalOperator apply(OpNode o){
-	      return o.getNodeValue();
-	    }
-	  });
-	  return Lists.newArrayList(i);
-	}
+  public LogicalPlan(@JsonProperty("head") PlanProperties head,
+      @JsonProperty("storage") Map<String, StorageEngineConfig> storageEngineMap,
+      @JsonProperty("query") List<LogicalOperator> operators) {
+    this.storageEngineMap = storageEngineMap;
+    this.properties = head;
+    this.graph = Graph.newGraph(operators, SourceOperator.class, SinkOperator.class);
+  }
 
-	public StorageEngineConfig getStorageEngine(String name){
-	  StorageEngineConfig ds = storageEngines.get(name);
-	  if(ds == null) throw new LogicalPlanParsingException(String.format("Unknown data source named [%s].", name));
-	  return ds;
-	}
-	
-	@JsonIgnore
-	public OperatorGraph getGraph(){
-	  return graph;
-	}
-	
-	@JsonProperty("head")
-  public PlanProperties getProperties() {
-    return properties;
+  @JsonProperty("query")
+  public List<LogicalOperator> getSortedOperators() {
+    return GraphAlgos.TopoSorter.sort(graph);
   }
 
+  public StorageEngineConfig getStorageEngine(String name) {
+    return storageEngineMap.get(name);
+  }
+
+  @JsonIgnore
+  public Graph<LogicalOperator, SourceOperator, SinkOperator> getGraph() {
+    return graph;
+  }
 
-	@JsonProperty("storage") 
-  public List<StorageEngineConfig> getStorageEngines() {
-    return new ArrayList<StorageEngineConfig>(storageEngines.values());
+  @JsonProperty("head")
+  public PlanProperties getProperties() {
+    return properties;
   }
-	
-//	public static LogicalPlan readFromString(String planString, DrillConfig config) throws JsonParseException, JsonMappingException, IOException{
-//	  ObjectMapper mapper = config.getMapper();
-//    LogicalPlan plan = mapper.readValue(planString, LogicalPlan.class);
-//    return plan;
-//	}
-//	
-//	public static LogicalPlan readFromResourcePath(String fileName, DrillConfig config) throws IOException{
-//	  URL u = LogicalPlan.class.getResource(fileName);
-//	  if(u == null) throw new FileNotFoundException(String.format("Unable to find file on path %s", fileName));
-//	  return readFromFile(u.getFile(), config);
-//	}
-//	
-//	public static LogicalPlan readFromFile(String fileName, DrillConfig config) throws IOException{
-//	  String planString = Files.toString(new File(fileName), Charsets.UTF_8);
-//	  return readFromString(planString, config);
-//	}
-//	
-	public String toJsonString(DrillConfig config) throws JsonProcessingException{
-    return config.getMapper().writeValueAsString(this);  
-	}
 
+  @JsonProperty("storage")
+  public Map<String, StorageEngineConfig> getStorageEngines() {
+    return storageEngineMap;
+  }
 
-  public static void main(String[] args) throws Exception {
-    DrillConfig config = DrillConfig.create();
-    String externalPlan = Files.toString(new File("src/test/resources/simple_plan.json"), Charsets.UTF_8);
-    LogicalPlan plan = parse(config, externalPlan);
+  public String toJsonString(DrillConfig config) throws JsonProcessingException {
+    return config.getMapper().writeValueAsString(this);
   }
 
   /** Parses a logical plan. */
@@ -135,7 +91,6 @@ public class LogicalPlan {
     ObjectMapper mapper = config.getMapper();
     try {
       LogicalPlan plan = mapper.readValue(planString, LogicalPlan.class);
-      System.out.println(mapper.writeValueAsString(plan));
       return plan;
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
deleted file mode 100644
index 8480b8a..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/OperatorGraph.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.logical;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.expression.visitors.OpVisitor;
-import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.SinkOperator;
-import org.apache.drill.common.logical.data.SourceOperator;
-import org.apache.drill.common.logical.graph.AdjacencyList;
-import org.apache.drill.common.logical.graph.GraphAlgos;
-import org.apache.drill.common.logical.graph.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OperatorGraph {
-
-  static final Logger logger = LoggerFactory.getLogger(OperatorGraph.class);
-
-  private AdjacencyList<OpNode> adjList;
-  private final Collection<SourceOperator> sources;
-  private final Collection<SinkOperator> sinks;
-
-  public OperatorGraph(List<LogicalOperator> operators) {
-    AdjacencyListBuilder b = new AdjacencyListBuilder();
-
-    // Some of these operators are operator chains hidden through the use of sequences. This is okay because the
-    // adjacency list builder is responsible for grabbing these as well.
-    for (LogicalOperator o : operators) {
-      o.accept(b);
-    }
-
-    adjList = b.getAdjacencyList();
-
-     List<List<OpNode>> cyclicReferences = GraphAlgos.checkDirected(adjList);
-     if(cyclicReferences.size() > 0){
-     throw new
-     IllegalArgumentException("A logical plan must be a valid DAG.  You have cyclic references in your graph.  " +
-     cyclicReferences);
-     }
-    sources = convert(adjList.getStartNodes(), SourceOperator.class, "Error determing list of source operators.");
-    // logger.debug("Source list {}", sources);
-    sinks = convert(adjList.getTerminalNodes(), SinkOperator.class, "Error determing list of source operators.");
-    // logger.debug("Sink list {}", sinks);
-
-  }
-
-  public AdjacencyList<OpNode> getAdjList() {
-    return adjList;
-  }
-
-  public Collection<SourceOperator> getSources() {
-    return sources;
-  }
-
-  public Collection<SinkOperator> getSinks() {
-    return sinks;
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T extends LogicalOperator> Collection<T> convert(Collection<OpNode> nodes, Class<T> classIdentifier,
-      String error) {
-    List<T> out = new ArrayList<T>(nodes.size());
-    for (OpNode o : nodes) {
-      LogicalOperator lo = o.getNodeValue();
-      if (classIdentifier.isAssignableFrom(lo.getClass())) {
-        out.add((T) lo);
-      } else {
-        throw new UnexpectedOperatorType(classIdentifier, lo, error);
-      }
-    }
-    return out;
-  }
-
-  public class AdjacencyListBuilder implements OpVisitor {
-    Map<LogicalOperator, OpNode> ops = new HashMap<LogicalOperator, OpNode>();
-
-    public boolean enter(LogicalOperator o) {
-      visit(o);
-      return true;
-    }
-
-    @Override
-    public void leave(LogicalOperator o) {
-//      for (LogicalOperator child : o) {
-//        child.accept(this);
-//      }
-    }
-
-    @Override
-    public boolean visit(LogicalOperator o) {
-      if(o == null) throw new IllegalArgumentException("Null operator.");
-      
-      if (!ops.containsKey(o)) {
-        ops.put(o, new OpNode(o));
-        return true;
-      }
-
-      return true;
-    }
-
-    public AdjacencyList<OpNode> getAdjacencyList() {
-      logger.debug("Values; {}", ops.values().toArray());
-      AdjacencyList<OpNode> a = new AdjacencyList<OpNode>();
-      for (OpNode from : ops.values()) {
-        for (LogicalOperator t : from.getNodeValue()) {
-          OpNode to = ops.get(t);
-          a.addEdge(from, to, 0);
-        }
-
-      }
-      a.fix();
-      return a;
-    }
-
-  }
-
-  public static class OpNode extends Node<LogicalOperator> {
-
-    public OpNode(LogicalOperator operator) {
-      super(operator);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
deleted file mode 100644
index 11b5f6c..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.logical;
-
-
-public class PlanProperties {
-	public String type = "apache_drill_logical_plan";
-	public int version;
-	public Generator generator = new Generator();
-	
-	public static class Generator{
-		public String type;
-		public String info;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
index f08c3d4..3a893d6 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
@@ -23,5 +23,4 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
 public interface StorageEngineConfig{
-  public String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
index 814cc21..853196c 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
@@ -28,15 +28,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 public abstract class StorageEngineConfigBase implements StorageEngineConfig{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineConfigBase.class);
   
-  private final String name;
-  
-  public StorageEngineConfigBase(@JsonProperty("name") String name) {
-    this.name = name;
-  }
-
-  public String getName() {
-    return name;
-  }
   
   public synchronized static Class<?>[] getSubTypes(DrillConfig config){
     List<String> packages = config.getStringList(CommonConstants.STORAGE_ENGINE_CONFIG_SCAN_PACKAGES);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
index e2af21d..d515503 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/UnexpectedOperatorType.java
@@ -17,7 +17,6 @@
  ******************************************************************************/
 package org.apache.drill.common.logical;
 
-import org.apache.drill.common.logical.data.LogicalOperator;
 
 public class UnexpectedOperatorType extends ValidationError{
 
@@ -25,8 +24,8 @@ public class UnexpectedOperatorType extends ValidationError{
     super(message);
   }
   
-  public <A extends LogicalOperator, B extends LogicalOperator> UnexpectedOperatorType(Class<A> expected, B operator, String message) {
-    super(message + " Expected operator of type " + expected.getSimpleName() + " but received operator of type " + operator.getClass().getCanonicalName());
+  public UnexpectedOperatorType(Object operator, String message) {
+    super(message + " Received node of type " + operator.getClass().getCanonicalName());
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
index e2bda45..a5301bb 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
@@ -20,7 +20,7 @@ package org.apache.drill.common.logical.data;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.drill.common.expression.visitors.OpVisitor;
+import org.apache.drill.common.graph.GraphValue;
 import org.apache.drill.common.logical.ValidationError;
 
 import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@@ -31,11 +31,8 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 @JsonPropertyOrder({"@id", "memo", "input"}) // op will always be first since it is wrapped.
 @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="op")
-public interface LogicalOperator extends Iterable<LogicalOperator>{
+public interface LogicalOperator extends GraphValue<LogicalOperator>{
 	
-	//public static final Class<?>[] SUB_TYPES = {Write.class, CollapsingAggregate.class, Segment.class, Filter.class, Flatten.class, Join.class, Order.class, Limit.class, Project.class, Scan.class, Sequence.class, Transform.class, Union.class, WindowFrame.class};
-	
-	public void accept(OpVisitor visitor);
 	public void registerAsSubscriber(LogicalOperator operator);
 	public void setupAndValidate(List<LogicalOperator> operators, Collection<ValidationError> errors);
 	

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
index 89ffbb2..5802056 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.drill.common.config.CommonConstants;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.visitors.OpVisitor;
+import org.apache.drill.common.graph.GraphVisitor;
 import org.apache.drill.common.logical.ValidationError;
 import org.apache.drill.common.util.PathScanner;
 
@@ -54,7 +54,7 @@ public abstract class LogicalOperatorBase implements LogicalOperator{
   }
 
   @Override
-  public void accept(OpVisitor visitor) {
+  public void accept(GraphVisitor<LogicalOperator> visitor) {
     if(visitor.enter(this)){
       for(LogicalOperator o : children){
         o.accept(visitor);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
index 1c8108e..d06f193 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
@@ -17,27 +17,26 @@
  ******************************************************************************/
 package org.apache.drill.common.logical.data;
 
+import org.apache.drill.common.defs.OrderDef;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("order")
 public class Order extends SingleInputOperator {
 
-  private final Ordering[] orderings;
+  private final OrderDef[] orderings;
   private final FieldReference within;
 
   @JsonCreator
-  public Order(@JsonProperty("within") FieldReference within, @JsonProperty("orderings") Ordering... orderings) {
+  public Order(@JsonProperty("within") FieldReference within, @JsonProperty("orderings") OrderDef... orderings) {
     this.orderings = orderings;
     this.within = within;
   }
   
-  public Ordering[] getOrderings() {
+  public OrderDef[] getOrderings() {
     return orderings;
   }
   
@@ -45,46 +44,6 @@ public class Order extends SingleInputOperator {
     return within;
   }
 
-  public static class Ordering {
-
-    private final Direction direction;
-    private final LogicalExpression expr;
-
-    @JsonCreator
-    public Ordering(@JsonProperty("order") String strOrder, @JsonProperty("expr") LogicalExpression expr) {
-      this.expr = expr;
-      this.direction = Direction.DESC.description.equals(strOrder) ? Direction.DESC : Direction.ASC; // default
-                                                                                                     // to
-                                                                                                     // ascending
-                                                                                                     // unless
-                                                                                                     // desc
-                                                                                                     // is
-                                                                                                     // provided.
-    }
-
-    @JsonIgnore
-    public Direction getDirection() {
-      return direction;
-    }
-
-    public LogicalExpression getExpr() {
-      return expr;
-    }
-
-    public String getOrder() {
-      return direction.description;
-    }
-
-  }
-
-  public static enum Direction {
-    ASC("asc"), DESC("desc");
-    public final String description;
-
-    Direction(String d) {
-      description = d;
-    }
-  }
   
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
index f22a5bc..c8d396b 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Scan.java
@@ -17,8 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.common.logical.data;
 
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.logical.JSONOptions;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2a6e1b33/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Store.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Store.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Store.java
index 4168468..0569b8a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Store.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Store.java
@@ -17,9 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.common.logical.data;
 
-import org.apache.drill.common.exceptions.ExpressionParsingException;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.JSONOptions;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.defs.PartitionDef;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,10 +30,10 @@ public class Store extends SinkOperator{
   
   private final String storageEngine;
   private final JSONOptions target;
-  private final PartitionOptions partition;
+  private final PartitionDef partition;
 
   @JsonCreator
-  public Store(@JsonProperty("storageengine") String storageEngine, @JsonProperty("target") JSONOptions target, @JsonProperty("partition") PartitionOptions partition) {
+  public Store(@JsonProperty("storageengine") String storageEngine, @JsonProperty("target") JSONOptions target, @JsonProperty("partition") PartitionDef partition) {
     super();
     this.storageEngine = storageEngine;
     this.target = target;
@@ -49,48 +48,10 @@ public class Store extends SinkOperator{
     return target;
   }
 
-  public PartitionOptions getPartition() {
+  public PartitionDef getPartition() {
     return partition;
   }
 
-  public static enum PartitionType{ 
-    RANDOM, HASH, ORDERED;
-    
-    public static PartitionType resolve(String val){
-      for(PartitionType pt : PartitionType.values()){
-        if(pt.name().equalsIgnoreCase(val)) return pt;
-      }
-      throw new ExpressionParsingException(String.format("Unable to determine partitioning type type for value '%s'.", val));
-
-    }
-    
-  };
   
-  public static class PartitionOptions{
-    private final PartitionType partitionType;
-    private final LogicalExpression[] expressions;
-    private final LogicalExpression[] starts;
-    
-    @JsonCreator
-    public PartitionOptions(@JsonProperty("partitionType") String partitionType, @JsonProperty("exprs") LogicalExpression[] expressions, @JsonProperty("starts") LogicalExpression[] starts) {
-      this.partitionType = PartitionType.resolve(partitionType);
-      this.expressions = expressions;
-      this.starts = starts;
-    }
-
-    public PartitionType getPartitionType() {
-      return partitionType;
-    }
-
-    public LogicalExpression[] getExpressions() {
-      return expressions;
-    }
-
-    public LogicalExpression[] getStarts() {
-      return starts;
-    }
-    
-    
-  }
   
 }