You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ts...@apache.org on 2013/01/22 03:35:07 UTC

[2/6] Basic reference interpreter implementation

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
new file mode 100644
index 0000000..21f220d
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+public interface DataWriter {
+  public void startRecord() throws IOException;
+  public void writeArrayStart(int length) throws IOException;
+  public void writeArrayElementStart() throws IOException;
+  public void writeArrayElementEnd() throws IOException;
+  public void writeArrayEnd() throws IOException;
+  
+  public void writeMapStart() throws IOException;
+  public void writeMapKey(CharSequence seq) throws IOException;
+  public void writeMapValueStart() throws IOException;
+  public void writeMapValueEnd() throws IOException;
+  public void writeMapEnd() throws IOException;
+  
+  public void writeBoolean(boolean b) throws IOException;
+  public void writeSInt32(int value) throws IOException;
+  public void writeSInt64(long value) throws IOException;
+  public void writeBytes(byte[] bytes) throws IOException;
+  public void writeSFloat64(double value) throws IOException;
+  public void writeSFloat32(float value) throws IOException;
+  public void writeNullValue() throws IOException;
+  public void writeCharSequence(CharSequence value) throws IOException;
+  public void endRecord() throws IOException;
+  public void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
new file mode 100644
index 0000000..e87c3fb
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BooleanEvaluator;
+
+public class FilterROP extends SingleInputROPBase<Filter>{
+
+  private FilterIterator iter;
+  private BooleanEvaluator filterEval;
+  
+  public FilterROP(Filter config) {
+    super(config);
+  }
+
+  @Override
+  protected void setupEvals(EvaluatorFactory builder) {
+    filterEval = builder.getBooleanEvaluator(record, config.getExpr());
+  }
+
+  @Override
+  public void setInput(RecordIterator incoming) {
+    iter = new FilterIterator(incoming);
+  }
+
+  @Override
+  public RecordIterator getIteratorInternal() {
+    return iter;
+  }
+  
+  private class FilterIterator implements RecordIterator{
+    RecordIterator incoming;
+
+    public FilterIterator(RecordIterator incoming) {
+      this.incoming = incoming;
+    }
+
+    @Override
+    public NextOutcome next() {
+      NextOutcome r;
+      while(true){
+        r = incoming.next();
+        if(r == NextOutcome.NONE_LEFT) return NextOutcome.NONE_LEFT;
+        if(filterEval.eval()) return r;
+      }
+    }
+
+    @Override
+    public ROP getParent() {
+      return FilterROP.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return record;
+    }
+    
+    
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
new file mode 100644
index 0000000..b236635
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
@@ -0,0 +1,75 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.Group;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.rops.MultiLevelMap.GroupingEntry;
+import org.apache.drill.exec.ref.rops.MultiLevelMap.MultiLevelIterator;
+
+public class GroupROP extends AbstractBlockingOperator<Group> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GroupROP.class);
+  
+  MultiLevelMap map;
+  
+  public GroupROP(Group g){
+    super(g);
+  }
+
+  @Override
+  protected void setupEvals(EvaluatorFactory builder) {
+    NamedExpression[] groupings = config.getGroupings();
+    BasicEvaluator[] evals = new BasicEvaluator[groupings.length];
+    for(int i =0; i < groupings.length; i++){
+      evals[i] = builder.getBasicEvaluator(inputRecord, groupings[i].getExpr());
+    }
+    map = new MultiLevelMap(evals);
+  }
+
+
+  @Override
+  protected void consumeRecord() {
+    map.add(inputRecord.copy());
+  }
+
+  
+  @Override
+  protected RecordIterator doWork() {
+    return new LevelIterator(map.getIterator());
+  }
+
+
+  private class LevelIterator implements RecordIterator {
+
+    final MultiLevelIterator iter;
+
+    public LevelIterator(MultiLevelIterator iter) {
+      super();
+      this.iter = iter;
+    }
+
+    @Override
+    public NextOutcome next() {
+      GroupingEntry e = iter.next();
+      if(e == null){
+        return NextOutcome.NONE_LEFT;
+      }
+      outputRecord.setRecord(e.record);
+      return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+    }
+
+    @Override
+    public ROP getParent() {
+      return GroupROP.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return outputRecord;
+    }
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
new file mode 100644
index 0000000..e9b1b26
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
@@ -0,0 +1,140 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class JSONDataWriter implements DataWriter{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONDataWriter.class);
+  
+  private final JsonGenerator g;
+//  private CharSequence transientName;
+  
+  public JSONDataWriter(OutputStream out) throws IOException{
+    JsonFactory f = new JsonFactory();
+    
+    this.g = f.createJsonGenerator(out, JsonEncoding.UTF8);
+    this.g.useDefaultPrettyPrinter();
+  }
+  
+  private String s(CharSequence seq) {
+    String s = (seq instanceof String) ? (String) seq : seq.toString();
+    return s;
+  }
+  
+  @Override
+  public void startRecord() throws IOException {
+    
+  }
+
+  @Override
+  public void writeArrayStart(int length) throws IOException {
+    g.writeStartArray();
+  }
+
+  @Override
+  public void writeArrayElementStart() throws IOException {
+  }
+
+  @Override
+  public void writeArrayElementEnd() throws IOException {
+  }
+
+  @Override
+  public void writeArrayEnd() throws IOException {
+    g.writeEndArray();
+  }
+
+  @Override
+  public void writeMapStart() throws IOException {
+    g.writeStartObject();
+  }
+
+  @Override
+  public void writeMapKey(CharSequence seq) throws IOException {
+    g.writeFieldName(s(seq));
+  }
+
+  @Override
+  public void writeMapValueStart() throws IOException {
+  }
+
+  @Override
+  public void writeMapValueEnd() throws IOException {
+  }
+
+  @Override
+  public void writeMapEnd() throws IOException {
+    g.writeEndObject();
+  }
+
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    g.writeBoolean(b);
+  }
+
+  @Override
+  public void writeSInt32(int value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeSInt64(long value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeBytes(byte[] bytes) throws IOException {
+    g.writeBinary(bytes);
+  }
+
+  @Override
+  public void writeSFloat64(double value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeSFloat32(float value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeNullValue() throws IOException {
+    g.writeNull();
+  }
+
+  @Override
+  public void writeCharSequence(CharSequence value) throws IOException {
+    g.writeString(s(value));
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    g.writeRawValue("\n");
+  }
+  
+  public void close() throws IOException{
+    g.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
new file mode 100644
index 0000000..924f3b1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.SimpleArrayValue;
+import org.apache.drill.exec.ref.values.SimpleMapValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+
+public class JSONScanner extends ROPBase<Scan> {
+  private static final Logger logger = LoggerFactory.getLogger(JSONScanner.class);
+
+  private ObjectMapper mapper = new ObjectMapper();
+  private InputStreamReader input;
+  private String file;
+  private SchemaPath rootPath;
+  private JsonParser parser;
+  private UnbackedRecord record = new UnbackedRecord();
+
+  public JSONScanner(Scan scan, String file) throws IOException {
+    super(scan);
+    JsonFactory factory = new JsonFactory();
+    factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+    factory.configure(Feature.ALLOW_COMMENTS, true);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    if(!file.contains("://")){
+      String current = new java.io.File( "." ).getCanonicalPath();
+      file = "file://" + current + "/" + file;
+    }
+    FSDataInputStream stream = fs.open(new Path(file));
+    this.input = new InputStreamReader(stream, Charsets.UTF_8);
+    this.parser = factory.createJsonParser(fs.open(new Path(file)));
+    this.file = file;
+    this.rootPath = config.getOutputReference();
+  }
+
+  private class NodeIter implements RecordIterator {
+
+    @Override
+    public NextOutcome next() {
+//      logger.debug("Next Record Called");
+      try {
+        if (parser.nextToken() == null) {
+//          logger.debug("No current token, returning.");
+          return NextOutcome.NONE_LEFT;
+        }
+        JsonNode n = mapper.readTree(parser);
+        if (n == null) {
+//          logger.debug("Nothing was returned for read tree, returning.");
+          return NextOutcome.NONE_LEFT;
+        }
+//        logger.debug("Record found, returning new json record.");
+        record.setClearAndSetRoot(rootPath, convert(n));
+        // todo, add schema checking here.
+        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+      } catch (IOException e) {
+        throw new RecordException("Failure while reading record", null, e);
+      }
+    }
+
+    @Override
+    public ROP getParent() {
+      return JSONScanner.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return record;
+    }
+
+  }
+
+  public static DataValue convert(JsonNode node) {
+    if (node == null || node.isNull() || node.isMissingNode()) {
+      return DataValue.NULL_VALUE;
+    } else if (node.isArray()) {
+      SimpleArrayValue arr = new SimpleArrayValue(node.size());
+      for (int i = 0; i < node.size(); i++) {
+        arr.addToArray(i, convert(node.get(i)));
+      }
+      return arr;
+    } else if (node.isObject()) {
+      SimpleMapValue map = new SimpleMapValue();
+      String name;
+      for (Iterator<String> iter = node.fieldNames(); iter.hasNext();) {
+        name = iter.next();
+        map.setByName(name, convert(node.get(name)));
+      }
+      return map;
+    } else if (node.isBinary()) {
+      try {
+        return new BytesScalar(node.binaryValue());
+      } catch (IOException e) {
+        throw new RuntimeException("Failure converting binary value.", e);
+      }
+    } else if (node.isBigDecimal()) {
+      throw new UnsupportedOperationException();
+//      return new BigDecimalScalar(node.decimalValue());
+    } else if (node.isBigInteger()) {
+      throw new UnsupportedOperationException();
+//      return new BigIntegerScalar(node.bigIntegerValue());
+    } else if (node.isBoolean()) {
+      return new BooleanScalar(node.asBoolean());
+    } else if (node.isFloatingPointNumber()) {
+      if (node.isBigDecimal()) {
+        throw new UnsupportedOperationException();
+//        return new BigDecimalScalar(node.decimalValue());
+      } else {
+        return new DoubleScalar(node.asDouble());
+      }
+    } else if (node.isInt()) {
+      return new IntegerScalar(node.asInt());
+    } else if (node.isLong()) {
+      return new LongScalar(node.asLong());
+    } else if (node.isTextual()) {
+      return new StringScalar(node.asText());
+    } else {
+      throw new UnsupportedOperationException(String.format("Don't know how to convert value of type %s.", node
+          .getClass().getCanonicalName()));
+    }
+
+  }
+
+  @Override
+  protected RecordIterator getIteratorInternal() {
+    return new NodeIter();
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      parser.close();
+      this.input.close();
+    } catch (IOException e) {
+      logger.warn("Error while closing InputStream for file {}", file, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
new file mode 100644
index 0000000..5dd30d4
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
@@ -0,0 +1,62 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+import org.apache.drill.common.logical.data.Write;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class JSONWriter extends BaseSinkROP<Write> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONWriter.class);
+
+  private FSDataOutputStream output;
+  private JSONDataWriter writer;
+
+  public JSONWriter(Write config) {
+    super(config);
+  }
+
+  @Override
+  protected void setupSink() throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    output = fs.create(new Path(config.getFileName()));
+    writer = new JSONDataWriter(output);
+  }
+
+  @Override
+  public long sinkRecord(RecordPointer r) throws IOException {
+    r.write(writer);
+    return output.getPos();
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      writer.close();
+      output.close();
+    } catch (IOException e) {
+      logger.warn("Error while closing output stream.", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
new file mode 100644
index 0000000..2c74b79
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
@@ -0,0 +1,200 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+
+/**
+ * MultiLevel map is a map of maps of maps (etc>. Each map is keyed by a data value. At the lowest level, the value is a
+ * list or record as opposed to a another straight map. A 4 way group by clause will therefore effectively look like
+ * Map<group1, Map<group2, Map<group3, Map<group4, record>>>>.  
+ */
+public class MultiLevelMap {
+  private BasicEvaluator[] fields;
+  private final int deepestLevel;
+  private final Level rootLevel;
+
+  public MultiLevelMap(BasicEvaluator[] fields) {
+    if (fields.length > 8)
+      throw new UnsupportedOperationException("Only groupings to eight levels are currently supported.");
+    this.fields = fields;
+    deepestLevel = fields.length - 1;
+    rootLevel = fields.length == 1 ? new LastLevel(fields[0]) : new IntermediateLevel(fields[0]);
+  }
+
+  public void add(RecordPointer r) {
+    rootLevel.add(r, 0);
+  }
+
+  private class IntermediateLevel implements Level {
+    final BasicEvaluator grouping;
+    Map<DataValue, Level> levelMap = new HashMap<DataValue, Level>();
+
+    public IntermediateLevel(BasicEvaluator field) {
+      super();
+      this.grouping = field;
+    }
+
+    @Override
+    public void add(RecordPointer r, int level) {
+      DataValue v = grouping.eval();
+      if (v == null) v = DataValue.NULL_VALUE;
+      Level l = levelMap.get(v);
+
+      level++;
+      
+      if (l == null) {
+        l = (level == deepestLevel) ? new LastLevel(fields[level]) :  new IntermediateLevel(fields[level]);
+        levelMap.put(v, l);
+      }
+
+      l.add(r, level);
+    }
+
+    @Override
+    public Iterator<?> iter() {
+      return levelMap.entrySet().iterator();
+    }
+
+  }
+
+  private class LastLevel implements Level {
+    private final BasicEvaluator grouping;
+    Map<DataValue, List<RecordPointer>> recordMap = new HashMap<DataValue, List<RecordPointer>>();
+
+    public LastLevel(BasicEvaluator grouping) {
+      super();
+      this.grouping = grouping;
+    }
+
+    @Override
+    public void add(RecordPointer r, int level) {
+      DataValue dv = grouping.eval();
+      if (dv == null) dv = DataValue.NULL_VALUE;
+      List<RecordPointer> list = recordMap.get(dv);
+      if (list == null) {
+        list = new LinkedList<RecordPointer>();
+        recordMap.put(dv, list);
+      }
+      list.add(r);
+
+    }
+
+    @Override
+    public Iterator<?> iter() {
+      return recordMap.entrySet().iterator();
+    }
+  }
+
+  public MultiLevelIterator getIterator() {
+    return new MultiLevelIterator();
+  }
+
+  /**
+   * Iterator for working through a MultiLevel map. The iterator only works if every sub map has at least one entry.
+   * This should always be the case.
+   */
+  public class MultiLevelIterator {
+    private final GroupingEntry ge;
+    private final Iterator<?>[] iterators;
+    private final DataValue[] values;
+    private Iterator<RecordPointer> lowestLevelRecordIterator = null;
+
+    private MultiLevelIterator() {
+      this.iterators = new Iterator[fields.length];
+      this.values = new DataValue[fields.length];
+      this.ge = new GroupingEntry(values);
+      iterators[0] = rootLevel.iter();
+
+    }
+
+    public GroupingEntry next() {
+      if (lowestLevelRecordIterator != null && lowestLevelRecordIterator.hasNext()) {
+        ge.record = lowestLevelRecordIterator.next();
+        return ge;
+      }
+
+      ge.bitset.clear();
+      int level = deepestLevel;
+
+      // pop empty iterators.
+      for (; level > 0; level--) {
+
+        // if the current iterator is null or has no next, we'll need it replaced.
+        if ((iterators[level] == null) || !iterators[level].hasNext()) {
+          // set change bit
+          ge.bitset.set(level);
+          continue;
+        } else {
+          break;
+        }
+      }
+
+      // if we're at the top level and the current iterator doesn't have any more items, we're done iterating.
+      if (level == 0) {
+        if (!iterators[0].hasNext()) return null;
+        ge.bitset.set(0);
+      }
+
+      // go through all but the bottom most level.
+      for (int i = level; i < deepestLevel; i++) {
+        int nextLevel = i + 1;
+        Iterator<?> iter = iterators[i];
+
+        @SuppressWarnings("unchecked")
+        Map.Entry<DataValue, Level> e = (Entry<DataValue, Level>) iter.next();
+        values[nextLevel] = e.getKey();
+        iterators[nextLevel] = e.getValue().iter();
+      }
+
+      @SuppressWarnings("unchecked")
+      Entry<DataValue, List<RecordPointer>> lowest = (Entry<DataValue, List<RecordPointer>>) iterators[deepestLevel]
+          .next();
+      lowestLevelRecordIterator = lowest.getValue().iterator();
+      ge.record = lowestLevelRecordIterator.next();
+      return ge;
+    }
+
+  }
+
+  /**
+   * A grouping entry that gives you the value for each of the grouping keys. Changebits informs you of whether
+   * particular nested groups are restarting.
+   */
+  public class GroupingEntry {
+    public final BasicEvaluator[] keys;
+    public final DataValue[] values;
+    public final BitSet bitset;
+    public RecordPointer record;
+    private BytesScalar groupKey;
+
+    public GroupingEntry(DataValue[] values) {
+      super();
+      this.groupKey = new BytesScalar(new byte[1]);
+      this.values = values;
+      this.keys = MultiLevelMap.this.fields;
+      bitset = new BitSet(keys.length);
+    }
+
+    public BytesScalar getGroupKey() {
+      this.groupKey = new BytesScalar(bitset.toByteArray());
+      return groupKey;
+    }
+
+  }
+
+  public interface Level {
+    public void add(RecordPointer r, int level);
+
+    public Iterator<?> iter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
new file mode 100644
index 0000000..8046803
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
@@ -0,0 +1,165 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.values.ComparableValue;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public class OrderROP extends AbstractBlockingOperator<Order>{
+  
+  private List<CompoundValue> records = new ArrayList<CompoundValue>();
+  private SortDefinition[] defs;
+  
+  public OrderROP(Order config) {
+    super(config);
+  }
+
+  @Override
+  protected void setupEvals(EvaluatorFactory builder) {
+    Ordering[] orderings = config.getOrderings();
+    defs = new SortDefinition[orderings.length];
+    for(int i =0; i < orderings.length; i++){
+      defs[i] = new SortDefinition(builder.getBasicEvaluator(inputRecord, orderings[i].getExpr()), orderings[i].getDirection() == Direction.ASC);
+    }
+  }
+
+  
+  @Override
+  protected void consumeRecord() {
+    DataValue[] values = new DataValue[defs.length];
+    
+    RecordPointer r = inputRecord.copy();
+    for(int i =0; i < defs.length; i++){
+      values[i] = defs[i].evaluator.eval();  
+    }
+    CompoundValue v = new CompoundValue(r, values);
+    records.add(v);
+  }
+
+  @Override
+  protected RecordIterator doWork() {
+    StackedComparator r = new StackedComparator(defs);
+    QuickSort qs = new QuickSort();
+    qs.sort(r, 0, records.size());
+    return new OrderIterator();
+  }
+
+  
+  public class SortDefinition{
+    boolean forward;
+    boolean nullsLast;
+    BasicEvaluator evaluator;
+    
+    public SortDefinition(BasicEvaluator evaluator, boolean forward) {
+      this.evaluator = evaluator;
+      this.forward = forward;
+    }
+  }
+  
+  private class CompoundValue{
+    DataValue[] values;
+    RecordPointer record;
+    public CompoundValue(RecordPointer record, DataValue[] values) {
+      super();
+      this.record = record;
+      this.values = values;
+    }
+    
+    
+  }
+  
+  private class StackedComparator implements IndexedSortable{
+//    private List<DataValue> values;
+//    private boolean[] nullsLast;
+//    private boolean[] forward;
+
+    public StackedComparator(SortDefinition[] defs){
+//      this.nullsLast = new boolean[defs.length];
+//      this.forward = new boolean[defs.length];
+//      for(int i =0; i < defs.length; i++){
+//        nullsLast[i] = defs[i].nullsLast;
+//        forward[i] = defs[i].forward;
+//      }
+    }
+    
+
+
+    @Override
+    public void swap(int index0, int index1) {
+      CompoundValue v = records.get(index0);
+      records.set(index0, records.get(index1));
+      records.set(index1, v);
+    }
+
+    @Override
+    public int compare(int index0, int index1) {
+      int result = 0;
+      CompoundValue v1 = records.get(index0);
+      CompoundValue v2 = records.get(index1);
+      
+      for(int i =0; i < defs.length; i++){
+        boolean nullLast = defs[i].nullsLast;
+        boolean asc = defs[i].forward;
+        DataValue dv1 = v1.values[i];
+        DataValue dv2 = v2.values[i];
+        if(dv1 == null){
+          if(dv2 == null){
+            result = 0;
+          }else{
+            result = nullLast ? 1 : -1;
+          }
+        }else if(dv2 == null){
+          result = nullLast ? -1 : 1;
+        }else{
+          if(dv1 instanceof ComparableValue && ((ComparableValue) dv1).supportsCompare(dv2)){
+            result = ((ComparableValue)dv1).compareTo(dv2);
+            if(!asc) result = -result;
+          }else{
+            return 0;  // we break even though there may be more evaluators because we should always return the same ordering for non-comparable values no matter the compare order.
+          }
+        }
+        if(result != 0) return result;
+      }
+      return result;
+    }
+    
+  }
+  
+  public class OrderIterator implements RecordIterator{
+    final Iterator<CompoundValue> iter;
+    public OrderIterator() {
+      this.iter = records.iterator();
+    }
+    
+    @Override
+    public NextOutcome next() {
+      if(iter.hasNext()) {
+        outputRecord.setRecord(iter.next().record);
+        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+      }
+      
+      return NextOutcome.NONE_LEFT;
+    }
+
+    @Override
+    public ROP getParent() {
+      return OrderROP.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return inputRecord;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
new file mode 100644
index 0000000..a8ebad6
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+
+public class ProjectROP extends SingleInputROPBase<Project> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectROP.class);
+
+  private final UnbackedRecord outputRecord = new UnbackedRecord();
+  private final PathSegment[] paths; 
+  private final BasicEvaluator[] evaluators;
+  private final NamedExpression[] selections;
+  private RecordIterator incoming;
+  
+  public ProjectROP(Project config) {
+    super(config);
+    this.selections = config.getSelections();
+    this.paths = new PathSegment[selections.length];
+    this.evaluators = new BasicEvaluator[selections.length];
+    for(int i=0; i < selections.length; i++){
+      paths[i] = selections[i].getRef().getRootSegment().getChild();
+    }
+  }
+
+  @Override
+  protected void setInput(RecordIterator incoming) {
+    this.incoming = incoming;
+  }
+
+  @Override
+  protected RecordIterator getIteratorInternal() {
+    return new ProjectionIterator();
+  }
+
+  @Override
+  protected void setupEvals(EvaluatorFactory builder)  {
+    for(int i =0; i < evaluators.length; i++){
+      evaluators[i] = builder.getBasicEvaluator(record, selections[i].getExpr());
+    }
+  }
+
+  
+  private class ProjectionIterator implements RecordIterator{
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return outputRecord;
+    }
+
+    @Override
+    public NextOutcome next() {
+      outputRecord.clear();
+      NextOutcome n = incoming.next();
+      if(n != NextOutcome.NONE_LEFT){
+        for(int i =0; i < evaluators.length; i++){
+          if(paths[i] == null){
+            outputRecord.merge(evaluators[i].eval());
+          }else{
+            outputRecord.addField(paths[i], evaluators[i].eval());
+          }
+        }
+      }
+      return n;
+    }
+
+    @Override
+    public ROP getParent() {
+      return ProjectROP.this;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
new file mode 100644
index 0000000..5aea366
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.values.DataValue;
+
+public class ProxySimpleRecord implements RecordPointer{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProxySimpleRecord.class);
+  
+  private RecordPointer record;
+
+  @Override
+  public DataValue getField(SchemaPath field) {
+    return record.getField(field);
+  }
+
+  @Override
+  public void addField(SchemaPath field, DataValue value) {
+    record.addField(field, value);
+  }
+  
+
+  @Override
+  public void addField(PathSegment segment, DataValue value) {
+    record.addField(segment, value);
+  }
+
+  @Override
+  public void write(DataWriter writer) throws IOException {
+    record.write(writer);
+  }
+
+  @Override
+  public void copyFrom(RecordPointer r) {
+    record.copyFrom(r);
+  }
+
+  @Override
+  public RecordPointer copy() {
+    return record.copy();
+  }
+
+  public RecordPointer getRecord() {
+    return record;
+  }
+
+  public void setRecord(RecordPointer record) {
+    this.record = record;
+  }
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
new file mode 100644
index 0000000..fe932c2
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
@@ -0,0 +1,12 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public interface ROP {
+  public void init(IteratorRegistry registry, EvaluatorFactory builder) throws SetupException;
+  public RecordIterator getOutput();
+  public void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
new file mode 100644
index 0000000..e7672fd
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
@@ -0,0 +1,62 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public abstract class ROPBase<T extends LogicalOperator> implements ROP{
+
+  private boolean alreadyUsed = false;
+  protected final T config;
+
+  
+  public ROPBase(T config) {
+    if(config == null) throw new IllegalArgumentException("Config must be defined.");
+    this.config = config;
+  }
+
+  protected void setupEvals(EvaluatorFactory builder) throws SetupException{};
+  protected void setupIterators(IteratorRegistry registry) throws SetupException{};
+  protected abstract RecordIterator getIteratorInternal();
+  
+  @Override
+  public void init(IteratorRegistry registry, EvaluatorFactory builder) throws SetupException {
+    if(config != null) registry.register(config, this);
+    setupIterators(registry);
+    setupEvals(builder);
+  }
+
+  @Override
+  public final RecordIterator getOutput() {
+    if(alreadyUsed) throw new IllegalStateException("You can only request the ouput of a reference operator once.");
+    alreadyUsed = true;
+    return getIteratorInternal();
+  }
+  
+  @Override
+  public void cleanup() {
+  }
+
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
new file mode 100644
index 0000000..d24c4c0
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public interface ReferenceOperator {
+  
+  public void setInput(RecordIterator incoming);
+  public RecordIterator getOutput();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
new file mode 100644
index 0000000..bad5443
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public abstract class ReferenceOperatorBase implements ReferenceOperator{
+
+  private boolean alreadyUsed = false;
+  
+  @Override
+  public void setInput(RecordIterator incoming) {
+  }
+
+  protected abstract RecordIterator getIteratorInternal();
+  
+  @Override
+  public final RecordIterator getOutput() {
+    if(alreadyUsed) throw new IllegalStateException("You can only request the ouput of a reference operator once.");
+    alreadyUsed = true;
+    return getIteratorInternal();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
new file mode 100644
index 0000000..b88d7b4
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.SingleInputOperator;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public abstract class SingleInputROPBase<T extends SingleInputOperator> extends ROPBase<T>{
+
+  protected RecordPointer record;
+
+  public SingleInputROPBase(T config) {
+    super(config);
+  }
+
+  @Override
+  protected void setupIterators(IteratorRegistry registry) {
+    List<RecordIterator> iters = registry.getOperator(config.getInput());
+    if(iters.size() != 1) throw new IllegalArgumentException(String.format("Expected one input iterator for class %s.  Received %d", this.getClass().getCanonicalName(), iters.size()));
+    RecordIterator i = iters.get(0);
+    this.record = i.getRecordPointer();
+    if(record == null) throw new SetupException(String.format("The %s op iterator return a null record pointer.", i.getParent()));
+    setInput(i);
+  }
+
+  protected abstract void setInput(RecordIterator incoming);
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
new file mode 100644
index 0000000..50c5509
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RunOutcome;
+
+public interface SinkROP {
+  
+  public RunOutcome run(StatusHandle handle);
+  
+  public interface StatusHandle{
+    public void progress(long bytes, long records);
+    public boolean okToContinue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
new file mode 100644
index 0000000..a98438d
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Transform;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.ConnectedEvaluator;
+
+public class TransformROP extends SingleInputROPBase<Transform>{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransformROP.class);
+  
+  private ConnectedEvaluator[] transformers;
+  private RecordIterator iter;
+
+  public TransformROP(Transform transform){
+    super(transform);
+  }
+  
+  @Override
+  protected void setupEvals(EvaluatorFactory builder) {
+    int i =0;
+    transformers = new ConnectedEvaluator[config.getTransforms().length];
+    for(NamedExpression ne : config.getTransforms()){
+      transformers[i] = builder.getConnectedEvaluator(record, ne);
+      i++;
+    }
+  }
+
+  @Override
+  public void setInput(RecordIterator incoming) {
+    this.iter = new TransformIterator(incoming);
+  }
+  
+  private class TransformIterator implements RecordIterator{
+    private final RecordIterator incoming;
+    
+    public TransformIterator(RecordIterator incoming){
+      this.incoming = incoming;
+    }
+    
+    @Override
+    public NextOutcome next() {
+      NextOutcome outcome = incoming.next();
+      if(outcome == NextOutcome.NONE_LEFT){
+        return outcome;
+      }else{
+        for(int i = 0; i < transformers.length; i++){
+          transformers[i].eval();
+        }
+        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+      }
+    }
+
+    @Override
+    public ROP getParent() {
+      return TransformROP.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return record;
+    }
+    
+  }
+
+  @Override
+  public RecordIterator getIteratorInternal() {
+    return iter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
new file mode 100644
index 0000000..aca0b5a
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+
+public class UnionROP extends ROPBase<LogicalOperator>{
+  
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROP.class);
+  
+  private List<RecordIterator> incoming;
+  private ProxySimpleRecord record;
+  
+  public UnionROP(Union config) {
+    super(config);
+  }
+
+  @Override
+  protected void setupEvals(EvaluatorFactory builder) {
+  }
+
+  @Override
+  protected void setupIterators(IteratorRegistry builder) {
+    incoming = builder.getOperator(config);
+    record.setRecord(incoming.get(0).getRecordPointer());
+  }
+
+  @Override
+  protected RecordIterator getIteratorInternal() {
+    return new MultiIterator();
+  }
+  
+  private class MultiIterator implements RecordIterator{
+    private int current = 0;
+
+    @Override
+    public NextOutcome next() {
+      for(; current < incoming.size(); current++, record.setRecord(incoming.get(current).getRecordPointer()))
+      while(current < incoming.size()){
+      
+        NextOutcome n = incoming.get(current).next();
+        if(n != NextOutcome.NONE_LEFT) return n;
+        
+      }
+      return NextOutcome.NONE_LEFT;
+    }
+
+    @Override
+    public ROP getParent() {
+      return UnionROP.this;
+    }
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return record;
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
new file mode 100644
index 0000000..bbfbc1c
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
@@ -0,0 +1,56 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.logical.data.SingleInputOperator;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+
+/**
+ * For simplification purposes, the Windowing reference implementation takes the lazy approach of finishing a window
+ * before it outputs any values from that window. While this is necessary in the ALL:ALL scenario, other scenarios could
+ * be implemented more efficiently with an appropriately size open window.
+ */
+public class WindowingROP extends SingleInputROPBase {
+
+  private List<RecordPointer> records = new LinkedList<RecordPointer>();
+  private WindowManager[] windows;
+  private Window[] windowPerKey;
+
+  // the place where we should start the next batch.
+  private int internalWindowPosition;
+
+  public WindowingROP(SingleInputOperator config) {
+    super(config);
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void setInput(RecordIterator incoming) {
+  }
+
+  @Override
+  protected RecordIterator getIteratorInternal() {
+    return null;
+  }
+
+  private class Window {
+
+    int ending;
+    List<RecordPointer> records = new ArrayList<RecordPointer>();
+
+    public void reset(int curPos) {
+
+    }
+  }
+
+  private class WindowManager {
+    private void increment() {
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
new file mode 100644
index 0000000..8966280
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
@@ -0,0 +1,29 @@
+package org.apache.drill.exec.ref.util;
+
+import org.apache.drill.exec.ref.RecordPointer;
+
+public interface BatchBreaker {
+  public boolean shouldBreakAfter(RecordPointer record);
+  
+  public static class CountBreaker implements BatchBreaker{
+    private final int max;
+    private int count = 0;
+    
+    public CountBreaker(int max) {
+      super();
+      this.max = max;
+    }
+
+    @Override
+    public boolean shouldBreakAfter(RecordPointer record) {
+      count++;
+      if(count > max){
+        count = 0;
+        return true;
+      }{
+        return false;
+      }
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
new file mode 100644
index 0000000..40287b5
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.ref.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+
+/**
+ * Facade interface for ByteBuffer and byte[]
+ */
+public interface ByteRange {
+  public byte getByte(int index);
+  public int getLength();
+  public void copyTo(byte[] buffer, int offset);
+  public void copyTo(InputStream is) throws IOException;
+  public void copyTo(ByteBuffer bb);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
new file mode 100644
index 0000000..9b14e63
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.types.DataType;
+
+
+public abstract class BaseArrayValue extends BaseDataValue implements ContainerValue{
+
+  @Override
+  public void addValue(PathSegment segment, DataValue v) {
+    DataValue fullPathValue = ValueUtils.getIntermediateValues(segment.getChild(), v);
+    if(segment.isArray()){ // we need to place this object in the given position.
+      int index = segment.getArraySegment().getIndex();
+      DataValue mergedValue = ValueUtils.getMergedDataValue(segment.getCollisionBehavior(), getByArrayIndex(index), fullPathValue);
+      addToArray(index, mergedValue);
+    }else{ // add to end of array.
+     addToArray(getNextIndex(), fullPathValue);
+    }
+  }
+
+  protected abstract void addToArray(int index, DataValue v);
+  protected abstract DataValue getByArrayIndex(int index);
+  protected abstract int getNextIndex();
+  public abstract void append(BaseArrayValue container);
+  public abstract int size();
+  
+  @Override
+  public DataValue getValue(PathSegment segment) {
+    if(segment == null){ // return entire array
+      return this;
+    }else if(!segment.isArray()){  // requesting a named value from within an array.  No value should be returned.
+      return DataValue.NULL_VALUE;
+    }else{
+      DataValue v = getByArrayIndex(segment.getArraySegment().getIndex());
+      if(v == null) return DataValue.NULL_VALUE;
+      return v.getValue(segment.getChild());
+    }
+  }
+
+  @Override
+  public ContainerValue getAsContainer() {
+    return this;
+  }
+
+  @Override
+  public DataType getDataType() {
+    return DataType.ARRAY;
+  }
+  
+  
+  
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
new file mode 100644
index 0000000..ac4cde9
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.PathSegment;
+
+public abstract class BaseDataValue implements DataValue{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseDataValue.class);
+  
+  @Override
+  public DataValue getValue(PathSegment segment) {
+    if(segment == null){
+      return this;
+    }else{ // looking for a lower level value when there is none.
+      return DataValue.NULL_VALUE;
+    }
+  }
+  
+  public void addValue(PathSegment segment, DataValue v) {
+    throw new IllegalArgumentException("You can't add a value to a non-container type.");
+  }
+
+  @Override
+  public NumericValue getAsNumeric() {
+    throw new DrillRuntimeException(String.format("A %s value is not a NumericValue.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public ContainerValue getAsContainer() {
+    throw new DrillRuntimeException(String.format("A %s value is not a ContainerValue.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public StringValue getAsStringValue() {
+    throw new DrillRuntimeException(String.format("A %s value is not a StringValue.", this.getClass().getCanonicalName()));
+  }
+
+  public BytesValue getAsBytesValue(){
+    throw new DrillRuntimeException(String.format("A %s value is not a BytesValue.", this.getClass().getCanonicalName()));
+  }
+  
+  public BooleanValue getAsBooleanValue(){
+    throw new DrillRuntimeException(String.format("A %s value is not a BooleanValue.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public abstract int hashCode();
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
new file mode 100644
index 0000000..fd6a968
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import java.util.Map;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.ValueExpressions.CollisionBehavior;
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+
+public abstract class BaseMapValue extends BaseDataValue implements ContainerValue,
+    Iterable<Map.Entry<CharSequence, DataValue>> {
+
+  @Override
+  public void addValue(PathSegment segment, DataValue v) {
+    if(v == null) throw new RecordException("You attempted to add a null value to a map.", null);
+    if (segment.isArray())
+      throw new RecordException(
+          "You're attempted to save something at a particular array location while the location of that setting was a Map.", null);
+
+    CharSequence name = segment.getNameSegment().getPath();
+    DataValue current = getByNameNoNulls(name);
+    if (!segment.isLastPath() && current != DataValue.NULL_VALUE) {
+      current.addValue(segment.getChild(), v);
+      return;
+    } else {
+      DataValue fullPathValue = ValueUtils.getIntermediateValues(segment.getChild(), v);
+      DataValue mergedValue = ValueUtils.getMergedDataValue(segment.getCollisionBehavior(), getByNameNoNulls(name),
+          fullPathValue);
+      setByName(name, mergedValue);
+
+    }
+
+  }
+
+  protected abstract void setByName(CharSequence name, DataValue v);
+
+  protected abstract DataValue getByName(CharSequence name);
+
+  private DataValue getByNameNoNulls(CharSequence name) {
+    DataValue v = getByName(name);
+    if (v == null) return NULL_VALUE;
+    return v;
+  }
+
+  @Override
+  public DataValue getValue(PathSegment segment) {
+    if (segment == null) return this;
+    if (segment.isArray()) return NULL_VALUE;
+    return getByNameNoNulls(segment.getNameSegment().getPath()).getValue(segment.getChild());
+  }
+
+  @Override
+  public ContainerValue getAsContainer() {
+    return this;
+  }
+
+  @Override
+  public DataType getDataType() {
+    return DataType.MAP;
+  }
+
+  public void merge(BaseMapValue otherMap) {
+    for (Map.Entry<CharSequence, DataValue> e : otherMap) {
+      final DataValue oldValue = getByName(e.getKey());
+      if (oldValue == DataValue.NULL_VALUE || oldValue == null) {
+        setByName(e.getKey(), e.getValue());
+      } else {
+        setByName(e.getKey(), ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, oldValue, e.getValue()));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
new file mode 100644
index 0000000..a3a9a85
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface BooleanValue extends DataValue{
+  public boolean getBoolean();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
new file mode 100644
index 0000000..460c358
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface BytesValue extends DataValue, ComparableValue{
+  
+  public byte[] getAsArray();
+  
+  // byteBuffer like
+  public int getLength();
+  public byte get(int pos);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
new file mode 100644
index 0000000..69a75c7
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface ComparableValue extends Comparable<DataValue>, DataValue {
+
+  /**
+   * Tells whether or not comparisons between the current ComparableValue and the provided DataValue are possible.
+   * @param dv2 The other DataValue
+   * @return True if comaprable.  False if not comparable.
+   */
+  public boolean supportsCompare(DataValue dv2);
+
+  /**
+   * Similar to standard comparable. However, the expectation is that supportsCompare should be called first. Likely
+   * will have unexpected outcome if you don't call supportsCompare first.
+   * 
+   * @param other
+   * @return
+   */
+  public int compareTo(DataValue other);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
new file mode 100644
index 0000000..42d1a97
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+
+
+public interface ContainerValue extends DataValue{
+  public BaseArrayValue getAsArray();
+  public BaseMapValue getAsMap();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
new file mode 100644
index 0000000..b78b8a1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
@@ -0,0 +1,25 @@
+package org.apache.drill.exec.ref.values;
+
+import java.io.IOException;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.ref.rops.DataWriter;
+
+
+public interface DataValue {
+  
+  public static final DataValue NULL_VALUE = new ScalarValues.NullValue();
+
+  public DataValue getValue(PathSegment segment);
+  public void addValue(PathSegment segment, DataValue v);
+  public void write(DataWriter writer) throws IOException;
+  public DataType getDataType();
+  public NumericValue getAsNumeric();
+  public ContainerValue getAsContainer();
+  public StringValue getAsStringValue();
+  public BooleanValue getAsBooleanValue();
+  public BytesValue getAsBytesValue();
+  public boolean equals(DataValue v);
+  public int hashCode();
+}