You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ts...@apache.org on 2013/01/22 03:35:07 UTC
[2/6] Basic reference interpreter implementation
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
new file mode 100644
index 0000000..21f220d
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+public interface DataWriter {
+ public void startRecord() throws IOException;
+ public void writeArrayStart(int length) throws IOException;
+ public void writeArrayElementStart() throws IOException;
+ public void writeArrayElementEnd() throws IOException;
+ public void writeArrayEnd() throws IOException;
+
+ public void writeMapStart() throws IOException;
+ public void writeMapKey(CharSequence seq) throws IOException;
+ public void writeMapValueStart() throws IOException;
+ public void writeMapValueEnd() throws IOException;
+ public void writeMapEnd() throws IOException;
+
+ public void writeBoolean(boolean b) throws IOException;
+ public void writeSInt32(int value) throws IOException;
+ public void writeSInt64(long value) throws IOException;
+ public void writeBytes(byte[] bytes) throws IOException;
+ public void writeSFloat64(double value) throws IOException;
+ public void writeSFloat32(float value) throws IOException;
+ public void writeNullValue() throws IOException;
+ public void writeCharSequence(CharSequence value) throws IOException;
+ public void endRecord() throws IOException;
+ public void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
new file mode 100644
index 0000000..e87c3fb
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BooleanEvaluator;
+
+public class FilterROP extends SingleInputROPBase<Filter>{
+
+ private FilterIterator iter;
+ private BooleanEvaluator filterEval;
+
+ public FilterROP(Filter config) {
+ super(config);
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ filterEval = builder.getBooleanEvaluator(record, config.getExpr());
+ }
+
+ @Override
+ public void setInput(RecordIterator incoming) {
+ iter = new FilterIterator(incoming);
+ }
+
+ @Override
+ public RecordIterator getIteratorInternal() {
+ return iter;
+ }
+
+ private class FilterIterator implements RecordIterator{
+ RecordIterator incoming;
+
+ public FilterIterator(RecordIterator incoming) {
+ this.incoming = incoming;
+ }
+
+ @Override
+ public NextOutcome next() {
+ NextOutcome r;
+ while(true){
+ r = incoming.next();
+ if(r == NextOutcome.NONE_LEFT) return NextOutcome.NONE_LEFT;
+ if(filterEval.eval()) return r;
+ }
+ }
+
+ @Override
+ public ROP getParent() {
+ return FilterROP.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
new file mode 100644
index 0000000..b236635
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/GroupROP.java
@@ -0,0 +1,75 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.Group;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.rops.MultiLevelMap.GroupingEntry;
+import org.apache.drill.exec.ref.rops.MultiLevelMap.MultiLevelIterator;
+
+public class GroupROP extends AbstractBlockingOperator<Group> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GroupROP.class);
+
+ MultiLevelMap map;
+
+ public GroupROP(Group g){
+ super(g);
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ NamedExpression[] groupings = config.getGroupings();
+ BasicEvaluator[] evals = new BasicEvaluator[groupings.length];
+ for(int i =0; i < groupings.length; i++){
+ evals[i] = builder.getBasicEvaluator(inputRecord, groupings[i].getExpr());
+ }
+ map = new MultiLevelMap(evals);
+ }
+
+
+ @Override
+ protected void consumeRecord() {
+ map.add(inputRecord.copy());
+ }
+
+
+ @Override
+ protected RecordIterator doWork() {
+ return new LevelIterator(map.getIterator());
+ }
+
+
+ private class LevelIterator implements RecordIterator {
+
+ final MultiLevelIterator iter;
+
+ public LevelIterator(MultiLevelIterator iter) {
+ super();
+ this.iter = iter;
+ }
+
+ @Override
+ public NextOutcome next() {
+ GroupingEntry e = iter.next();
+ if(e == null){
+ return NextOutcome.NONE_LEFT;
+ }
+ outputRecord.setRecord(e.record);
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ }
+
+ @Override
+ public ROP getParent() {
+ return GroupROP.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return outputRecord;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
new file mode 100644
index 0000000..e9b1b26
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONDataWriter.java
@@ -0,0 +1,140 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class JSONDataWriter implements DataWriter{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONDataWriter.class);
+
+ private final JsonGenerator g;
+// private CharSequence transientName;
+
+ public JSONDataWriter(OutputStream out) throws IOException{
+ JsonFactory f = new JsonFactory();
+
+ this.g = f.createJsonGenerator(out, JsonEncoding.UTF8);
+ this.g.useDefaultPrettyPrinter();
+ }
+
+ private String s(CharSequence seq) {
+ String s = (seq instanceof String) ? (String) seq : seq.toString();
+ return s;
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+
+ }
+
+ @Override
+ public void writeArrayStart(int length) throws IOException {
+ g.writeStartArray();
+ }
+
+ @Override
+ public void writeArrayElementStart() throws IOException {
+ }
+
+ @Override
+ public void writeArrayElementEnd() throws IOException {
+ }
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ g.writeEndArray();
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ g.writeStartObject();
+ }
+
+ @Override
+ public void writeMapKey(CharSequence seq) throws IOException {
+ g.writeFieldName(s(seq));
+ }
+
+ @Override
+ public void writeMapValueStart() throws IOException {
+ }
+
+ @Override
+ public void writeMapValueEnd() throws IOException {
+ }
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ g.writeEndObject();
+ }
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ g.writeBoolean(b);
+ }
+
+ @Override
+ public void writeSInt32(int value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeSInt64(long value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes) throws IOException {
+ g.writeBinary(bytes);
+ }
+
+ @Override
+ public void writeSFloat64(double value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeSFloat32(float value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeNullValue() throws IOException {
+ g.writeNull();
+ }
+
+ @Override
+ public void writeCharSequence(CharSequence value) throws IOException {
+ g.writeString(s(value));
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ g.writeRawValue("\n");
+ }
+
+ public void close() throws IOException{
+ g.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
new file mode 100644
index 0000000..924f3b1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONScanner.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.SimpleArrayValue;
+import org.apache.drill.exec.ref.values.SimpleMapValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+
+public class JSONScanner extends ROPBase<Scan> {
+ private static final Logger logger = LoggerFactory.getLogger(JSONScanner.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+ private InputStreamReader input;
+ private String file;
+ private SchemaPath rootPath;
+ private JsonParser parser;
+ private UnbackedRecord record = new UnbackedRecord();
+
+ public JSONScanner(Scan scan, String file) throws IOException {
+ super(scan);
+ JsonFactory factory = new JsonFactory();
+ factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ factory.configure(Feature.ALLOW_COMMENTS, true);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ if(!file.contains("://")){
+ String current = new java.io.File( "." ).getCanonicalPath();
+ file = "file://" + current + "/" + file;
+ }
+ FSDataInputStream stream = fs.open(new Path(file));
+ this.input = new InputStreamReader(stream, Charsets.UTF_8);
+ this.parser = factory.createJsonParser(fs.open(new Path(file)));
+ this.file = file;
+ this.rootPath = config.getOutputReference();
+ }
+
+ private class NodeIter implements RecordIterator {
+
+ @Override
+ public NextOutcome next() {
+// logger.debug("Next Record Called");
+ try {
+ if (parser.nextToken() == null) {
+// logger.debug("No current token, returning.");
+ return NextOutcome.NONE_LEFT;
+ }
+ JsonNode n = mapper.readTree(parser);
+ if (n == null) {
+// logger.debug("Nothing was returned for read tree, returning.");
+ return NextOutcome.NONE_LEFT;
+ }
+// logger.debug("Record found, returning new json record.");
+ record.setClearAndSetRoot(rootPath, convert(n));
+ // todo, add schema checking here.
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ } catch (IOException e) {
+ throw new RecordException("Failure while reading record", null, e);
+ }
+ }
+
+ @Override
+ public ROP getParent() {
+ return JSONScanner.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+ }
+
+ public static DataValue convert(JsonNode node) {
+ if (node == null || node.isNull() || node.isMissingNode()) {
+ return DataValue.NULL_VALUE;
+ } else if (node.isArray()) {
+ SimpleArrayValue arr = new SimpleArrayValue(node.size());
+ for (int i = 0; i < node.size(); i++) {
+ arr.addToArray(i, convert(node.get(i)));
+ }
+ return arr;
+ } else if (node.isObject()) {
+ SimpleMapValue map = new SimpleMapValue();
+ String name;
+ for (Iterator<String> iter = node.fieldNames(); iter.hasNext();) {
+ name = iter.next();
+ map.setByName(name, convert(node.get(name)));
+ }
+ return map;
+ } else if (node.isBinary()) {
+ try {
+ return new BytesScalar(node.binaryValue());
+ } catch (IOException e) {
+ throw new RuntimeException("Failure converting binary value.", e);
+ }
+ } else if (node.isBigDecimal()) {
+ throw new UnsupportedOperationException();
+// return new BigDecimalScalar(node.decimalValue());
+ } else if (node.isBigInteger()) {
+ throw new UnsupportedOperationException();
+// return new BigIntegerScalar(node.bigIntegerValue());
+ } else if (node.isBoolean()) {
+ return new BooleanScalar(node.asBoolean());
+ } else if (node.isFloatingPointNumber()) {
+ if (node.isBigDecimal()) {
+ throw new UnsupportedOperationException();
+// return new BigDecimalScalar(node.decimalValue());
+ } else {
+ return new DoubleScalar(node.asDouble());
+ }
+ } else if (node.isInt()) {
+ return new IntegerScalar(node.asInt());
+ } else if (node.isLong()) {
+ return new LongScalar(node.asLong());
+ } else if (node.isTextual()) {
+ return new StringScalar(node.asText());
+ } else {
+ throw new UnsupportedOperationException(String.format("Don't know how to convert value of type %s.", node
+ .getClass().getCanonicalName()));
+ }
+
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return new NodeIter();
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ parser.close();
+ this.input.close();
+ } catch (IOException e) {
+ logger.warn("Error while closing InputStream for file {}", file, e);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
new file mode 100644
index 0000000..5dd30d4
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JSONWriter.java
@@ -0,0 +1,62 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+import org.apache.drill.common.logical.data.Write;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class JSONWriter extends BaseSinkROP<Write> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONWriter.class);
+
+ private FSDataOutputStream output;
+ private JSONDataWriter writer;
+
+ public JSONWriter(Write config) {
+ super(config);
+ }
+
+ @Override
+ protected void setupSink() throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ output = fs.create(new Path(config.getFileName()));
+ writer = new JSONDataWriter(output);
+ }
+
+ @Override
+ public long sinkRecord(RecordPointer r) throws IOException {
+ r.write(writer);
+ return output.getPos();
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ writer.close();
+ output.close();
+ } catch (IOException e) {
+ logger.warn("Error while closing output stream.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
new file mode 100644
index 0000000..2c74b79
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/MultiLevelMap.java
@@ -0,0 +1,200 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+
+/**
+ * MultiLevel map is a map of maps of maps (etc>. Each map is keyed by a data value. At the lowest level, the value is a
+ * list or record as opposed to a another straight map. A 4 way group by clause will therefore effectively look like
+ * Map<group1, Map<group2, Map<group3, Map<group4, record>>>>.
+ */
+public class MultiLevelMap {
+ private BasicEvaluator[] fields;
+ private final int deepestLevel;
+ private final Level rootLevel;
+
+ public MultiLevelMap(BasicEvaluator[] fields) {
+ if (fields.length > 8)
+ throw new UnsupportedOperationException("Only groupings to eight levels are currently supported.");
+ this.fields = fields;
+ deepestLevel = fields.length - 1;
+ rootLevel = fields.length == 1 ? new LastLevel(fields[0]) : new IntermediateLevel(fields[0]);
+ }
+
+ public void add(RecordPointer r) {
+ rootLevel.add(r, 0);
+ }
+
+ private class IntermediateLevel implements Level {
+ final BasicEvaluator grouping;
+ Map<DataValue, Level> levelMap = new HashMap<DataValue, Level>();
+
+ public IntermediateLevel(BasicEvaluator field) {
+ super();
+ this.grouping = field;
+ }
+
+ @Override
+ public void add(RecordPointer r, int level) {
+ DataValue v = grouping.eval();
+ if (v == null) v = DataValue.NULL_VALUE;
+ Level l = levelMap.get(v);
+
+ level++;
+
+ if (l == null) {
+ l = (level == deepestLevel) ? new LastLevel(fields[level]) : new IntermediateLevel(fields[level]);
+ levelMap.put(v, l);
+ }
+
+ l.add(r, level);
+ }
+
+ @Override
+ public Iterator<?> iter() {
+ return levelMap.entrySet().iterator();
+ }
+
+ }
+
+ private class LastLevel implements Level {
+ private final BasicEvaluator grouping;
+ Map<DataValue, List<RecordPointer>> recordMap = new HashMap<DataValue, List<RecordPointer>>();
+
+ public LastLevel(BasicEvaluator grouping) {
+ super();
+ this.grouping = grouping;
+ }
+
+ @Override
+ public void add(RecordPointer r, int level) {
+ DataValue dv = grouping.eval();
+ if (dv == null) dv = DataValue.NULL_VALUE;
+ List<RecordPointer> list = recordMap.get(dv);
+ if (list == null) {
+ list = new LinkedList<RecordPointer>();
+ recordMap.put(dv, list);
+ }
+ list.add(r);
+
+ }
+
+ @Override
+ public Iterator<?> iter() {
+ return recordMap.entrySet().iterator();
+ }
+ }
+
+ public MultiLevelIterator getIterator() {
+ return new MultiLevelIterator();
+ }
+
+ /**
+ * Iterator for working through a MultiLevel map. The iterator only works if every sub map has at least one entry.
+ * This should always be the case.
+ */
+ public class MultiLevelIterator {
+ private final GroupingEntry ge;
+ private final Iterator<?>[] iterators;
+ private final DataValue[] values;
+ private Iterator<RecordPointer> lowestLevelRecordIterator = null;
+
+ private MultiLevelIterator() {
+ this.iterators = new Iterator[fields.length];
+ this.values = new DataValue[fields.length];
+ this.ge = new GroupingEntry(values);
+ iterators[0] = rootLevel.iter();
+
+ }
+
+ public GroupingEntry next() {
+ if (lowestLevelRecordIterator != null && lowestLevelRecordIterator.hasNext()) {
+ ge.record = lowestLevelRecordIterator.next();
+ return ge;
+ }
+
+ ge.bitset.clear();
+ int level = deepestLevel;
+
+ // pop empty iterators.
+ for (; level > 0; level--) {
+
+ // if the current iterator is null or has no next, we'll need it replaced.
+ if ((iterators[level] == null) || !iterators[level].hasNext()) {
+ // set change bit
+ ge.bitset.set(level);
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ // if we're at the top level and the current iterator doesn't have any more items, we're done iterating.
+ if (level == 0) {
+ if (!iterators[0].hasNext()) return null;
+ ge.bitset.set(0);
+ }
+
+ // go through all but the bottom most level.
+ for (int i = level; i < deepestLevel; i++) {
+ int nextLevel = i + 1;
+ Iterator<?> iter = iterators[i];
+
+ @SuppressWarnings("unchecked")
+ Map.Entry<DataValue, Level> e = (Entry<DataValue, Level>) iter.next();
+ values[nextLevel] = e.getKey();
+ iterators[nextLevel] = e.getValue().iter();
+ }
+
+ @SuppressWarnings("unchecked")
+ Entry<DataValue, List<RecordPointer>> lowest = (Entry<DataValue, List<RecordPointer>>) iterators[deepestLevel]
+ .next();
+ lowestLevelRecordIterator = lowest.getValue().iterator();
+ ge.record = lowestLevelRecordIterator.next();
+ return ge;
+ }
+
+ }
+
+ /**
+ * A grouping entry that gives you the value for each of the grouping keys. Changebits informs you of whether
+ * particular nested groups are restarting.
+ */
+ public class GroupingEntry {
+ public final BasicEvaluator[] keys;
+ public final DataValue[] values;
+ public final BitSet bitset;
+ public RecordPointer record;
+ private BytesScalar groupKey;
+
+ public GroupingEntry(DataValue[] values) {
+ super();
+ this.groupKey = new BytesScalar(new byte[1]);
+ this.values = values;
+ this.keys = MultiLevelMap.this.fields;
+ bitset = new BitSet(keys.length);
+ }
+
+ public BytesScalar getGroupKey() {
+ this.groupKey = new BytesScalar(bitset.toByteArray());
+ return groupKey;
+ }
+
+ }
+
+ public interface Level {
+ public void add(RecordPointer r, int level);
+
+ public Iterator<?> iter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
new file mode 100644
index 0000000..8046803
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
@@ -0,0 +1,165 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+import org.apache.drill.exec.ref.values.ComparableValue;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public class OrderROP extends AbstractBlockingOperator<Order>{
+
+ private List<CompoundValue> records = new ArrayList<CompoundValue>();
+ private SortDefinition[] defs;
+
+ public OrderROP(Order config) {
+ super(config);
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ Ordering[] orderings = config.getOrderings();
+ defs = new SortDefinition[orderings.length];
+ for(int i =0; i < orderings.length; i++){
+ defs[i] = new SortDefinition(builder.getBasicEvaluator(inputRecord, orderings[i].getExpr()), orderings[i].getDirection() == Direction.ASC);
+ }
+ }
+
+
+ @Override
+ protected void consumeRecord() {
+ DataValue[] values = new DataValue[defs.length];
+
+ RecordPointer r = inputRecord.copy();
+ for(int i =0; i < defs.length; i++){
+ values[i] = defs[i].evaluator.eval();
+ }
+ CompoundValue v = new CompoundValue(r, values);
+ records.add(v);
+ }
+
+ @Override
+ protected RecordIterator doWork() {
+ StackedComparator r = new StackedComparator(defs);
+ QuickSort qs = new QuickSort();
+ qs.sort(r, 0, records.size());
+ return new OrderIterator();
+ }
+
+
+ public class SortDefinition{
+ boolean forward;
+ boolean nullsLast;
+ BasicEvaluator evaluator;
+
+ public SortDefinition(BasicEvaluator evaluator, boolean forward) {
+ this.evaluator = evaluator;
+ this.forward = forward;
+ }
+ }
+
+ private class CompoundValue{
+ DataValue[] values;
+ RecordPointer record;
+ public CompoundValue(RecordPointer record, DataValue[] values) {
+ super();
+ this.record = record;
+ this.values = values;
+ }
+
+
+ }
+
+ private class StackedComparator implements IndexedSortable{
+// private List<DataValue> values;
+// private boolean[] nullsLast;
+// private boolean[] forward;
+
+ public StackedComparator(SortDefinition[] defs){
+// this.nullsLast = new boolean[defs.length];
+// this.forward = new boolean[defs.length];
+// for(int i =0; i < defs.length; i++){
+// nullsLast[i] = defs[i].nullsLast;
+// forward[i] = defs[i].forward;
+// }
+ }
+
+
+
+ @Override
+ public void swap(int index0, int index1) {
+ CompoundValue v = records.get(index0);
+ records.set(index0, records.get(index1));
+ records.set(index1, v);
+ }
+
+ @Override
+ public int compare(int index0, int index1) {
+ int result = 0;
+ CompoundValue v1 = records.get(index0);
+ CompoundValue v2 = records.get(index1);
+
+ for(int i =0; i < defs.length; i++){
+ boolean nullLast = defs[i].nullsLast;
+ boolean asc = defs[i].forward;
+ DataValue dv1 = v1.values[i];
+ DataValue dv2 = v2.values[i];
+ if(dv1 == null){
+ if(dv2 == null){
+ result = 0;
+ }else{
+ result = nullLast ? 1 : -1;
+ }
+ }else if(dv2 == null){
+ result = nullLast ? -1 : 1;
+ }else{
+ if(dv1 instanceof ComparableValue && ((ComparableValue) dv1).supportsCompare(dv2)){
+ result = ((ComparableValue)dv1).compareTo(dv2);
+ if(!asc) result = -result;
+ }else{
+ return 0; // we break even though there may be more evaluators because we should always return the same ordering for non-comparable values no matter the compare order.
+ }
+ }
+ if(result != 0) return result;
+ }
+ return result;
+ }
+
+ }
+
+ public class OrderIterator implements RecordIterator{
+ final Iterator<CompoundValue> iter;
+ public OrderIterator() {
+ this.iter = records.iterator();
+ }
+
+ @Override
+ public NextOutcome next() {
+ if(iter.hasNext()) {
+ outputRecord.setRecord(iter.next().record);
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ }
+
+ return NextOutcome.NONE_LEFT;
+ }
+
+ @Override
+ public ROP getParent() {
+ return OrderROP.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return inputRecord;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
new file mode 100644
index 0000000..a8ebad6
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProjectROP.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
+
+public class ProjectROP extends SingleInputROPBase<Project> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectROP.class);
+
+ private final UnbackedRecord outputRecord = new UnbackedRecord();
+ private final PathSegment[] paths;
+ private final BasicEvaluator[] evaluators;
+ private final NamedExpression[] selections;
+ private RecordIterator incoming;
+
+ public ProjectROP(Project config) {
+ super(config);
+ this.selections = config.getSelections();
+ this.paths = new PathSegment[selections.length];
+ this.evaluators = new BasicEvaluator[selections.length];
+ for(int i=0; i < selections.length; i++){
+ paths[i] = selections[i].getRef().getRootSegment().getChild();
+ }
+ }
+
+ @Override
+ protected void setInput(RecordIterator incoming) {
+ this.incoming = incoming;
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return new ProjectionIterator();
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ for(int i =0; i < evaluators.length; i++){
+ evaluators[i] = builder.getBasicEvaluator(record, selections[i].getExpr());
+ }
+ }
+
+
+ private class ProjectionIterator implements RecordIterator{
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return outputRecord;
+ }
+
+ @Override
+ public NextOutcome next() {
+ outputRecord.clear();
+ NextOutcome n = incoming.next();
+ if(n != NextOutcome.NONE_LEFT){
+ for(int i =0; i < evaluators.length; i++){
+ if(paths[i] == null){
+ outputRecord.merge(evaluators[i].eval());
+ }else{
+ outputRecord.addField(paths[i], evaluators[i].eval());
+ }
+ }
+ }
+ return n;
+ }
+
+ @Override
+ public ROP getParent() {
+ return ProjectROP.this;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
new file mode 100644
index 0000000..5aea366
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ProxySimpleRecord.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.io.IOException;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.values.DataValue;
+
+public class ProxySimpleRecord implements RecordPointer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProxySimpleRecord.class);
+
+ private RecordPointer record;
+
+ @Override
+ public DataValue getField(SchemaPath field) {
+ return record.getField(field);
+ }
+
+ @Override
+ public void addField(SchemaPath field, DataValue value) {
+ record.addField(field, value);
+ }
+
+
+ @Override
+ public void addField(PathSegment segment, DataValue value) {
+ record.addField(segment, value);
+ }
+
+ @Override
+ public void write(DataWriter writer) throws IOException {
+ record.write(writer);
+ }
+
+ @Override
+ public void copyFrom(RecordPointer r) {
+ record.copyFrom(r);
+ }
+
+ @Override
+ public RecordPointer copy() {
+ return record.copy();
+ }
+
+ public RecordPointer getRecord() {
+ return record;
+ }
+
+ public void setRecord(RecordPointer record) {
+ this.record = record;
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
new file mode 100644
index 0000000..fe932c2
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROP.java
@@ -0,0 +1,12 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public interface ROP {
+ public void init(IteratorRegistry registry, EvaluatorFactory builder) throws SetupException;
+ public RecordIterator getOutput();
+ public void cleanup();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
new file mode 100644
index 0000000..e7672fd
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ROPBase.java
@@ -0,0 +1,62 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public abstract class ROPBase<T extends LogicalOperator> implements ROP{
+
+ private boolean alreadyUsed = false;
+ protected final T config;
+
+
+ public ROPBase(T config) {
+ if(config == null) throw new IllegalArgumentException("Config must be defined.");
+ this.config = config;
+ }
+
+ protected void setupEvals(EvaluatorFactory builder) throws SetupException{};
+ protected void setupIterators(IteratorRegistry registry) throws SetupException{};
+ protected abstract RecordIterator getIteratorInternal();
+
+ @Override
+ public void init(IteratorRegistry registry, EvaluatorFactory builder) throws SetupException {
+ if(config != null) registry.register(config, this);
+ setupIterators(registry);
+ setupEvals(builder);
+ }
+
+ @Override
+ public final RecordIterator getOutput() {
+ if(alreadyUsed) throw new IllegalStateException("You can only request the ouput of a reference operator once.");
+ alreadyUsed = true;
+ return getIteratorInternal();
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
new file mode 100644
index 0000000..d24c4c0
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public interface ReferenceOperator {
+
+ public void setInput(RecordIterator incoming);
+ public RecordIterator getOutput();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
new file mode 100644
index 0000000..bad5443
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ReferenceOperatorBase.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public abstract class ReferenceOperatorBase implements ReferenceOperator{
+
+ private boolean alreadyUsed = false;
+
+ @Override
+ public void setInput(RecordIterator incoming) {
+ }
+
+ protected abstract RecordIterator getIteratorInternal();
+
+ @Override
+ public final RecordIterator getOutput() {
+ if(alreadyUsed) throw new IllegalStateException("You can only request the ouput of a reference operator once.");
+ alreadyUsed = true;
+ return getIteratorInternal();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
new file mode 100644
index 0000000..b88d7b4
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SingleInputROPBase.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.SingleInputOperator;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+public abstract class SingleInputROPBase<T extends SingleInputOperator> extends ROPBase<T>{
+
+ protected RecordPointer record;
+
+ public SingleInputROPBase(T config) {
+ super(config);
+ }
+
+ @Override
+ protected void setupIterators(IteratorRegistry registry) {
+ List<RecordIterator> iters = registry.getOperator(config.getInput());
+ if(iters.size() != 1) throw new IllegalArgumentException(String.format("Expected one input iterator for class %s. Received %d", this.getClass().getCanonicalName(), iters.size()));
+ RecordIterator i = iters.get(0);
+ this.record = i.getRecordPointer();
+ if(record == null) throw new SetupException(String.format("The %s op iterator return a null record pointer.", i.getParent()));
+ setInput(i);
+ }
+
+ protected abstract void setInput(RecordIterator incoming);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
new file mode 100644
index 0000000..50c5509
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/SinkROP.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.exec.ref.RunOutcome;
+
+public interface SinkROP {
+
+ public RunOutcome run(StatusHandle handle);
+
+ public interface StatusHandle{
+ public void progress(long bytes, long records);
+ public boolean okToContinue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
new file mode 100644
index 0000000..a98438d
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/TransformROP.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Transform;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+import org.apache.drill.exec.ref.eval.EvaluatorTypes.ConnectedEvaluator;
+
+public class TransformROP extends SingleInputROPBase<Transform>{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransformROP.class);
+
+ private ConnectedEvaluator[] transformers;
+ private RecordIterator iter;
+
+ public TransformROP(Transform transform){
+ super(transform);
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ int i =0;
+ transformers = new ConnectedEvaluator[config.getTransforms().length];
+ for(NamedExpression ne : config.getTransforms()){
+ transformers[i] = builder.getConnectedEvaluator(record, ne);
+ i++;
+ }
+ }
+
+ @Override
+ public void setInput(RecordIterator incoming) {
+ this.iter = new TransformIterator(incoming);
+ }
+
+ private class TransformIterator implements RecordIterator{
+ private final RecordIterator incoming;
+
+ public TransformIterator(RecordIterator incoming){
+ this.incoming = incoming;
+ }
+
+ @Override
+ public NextOutcome next() {
+ NextOutcome outcome = incoming.next();
+ if(outcome == NextOutcome.NONE_LEFT){
+ return outcome;
+ }else{
+ for(int i = 0; i < transformers.length; i++){
+ transformers[i].eval();
+ }
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ }
+ }
+
+ @Override
+ public ROP getParent() {
+ return TransformROP.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+ }
+
+ @Override
+ public RecordIterator getIteratorInternal() {
+ return iter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
new file mode 100644
index 0000000..aca0b5a
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.eval.EvaluatorFactory;
+
+public class UnionROP extends ROPBase<LogicalOperator>{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROP.class);
+
+ private List<RecordIterator> incoming;
+ private ProxySimpleRecord record;
+
+ public UnionROP(Union config) {
+ super(config);
+ }
+
+ @Override
+ protected void setupEvals(EvaluatorFactory builder) {
+ }
+
+ @Override
+ protected void setupIterators(IteratorRegistry builder) {
+ incoming = builder.getOperator(config);
+ record.setRecord(incoming.get(0).getRecordPointer());
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return new MultiIterator();
+ }
+
+ private class MultiIterator implements RecordIterator{
+ private int current = 0;
+
+ @Override
+ public NextOutcome next() {
+ for(; current < incoming.size(); current++, record.setRecord(incoming.get(current).getRecordPointer()))
+ while(current < incoming.size()){
+
+ NextOutcome n = incoming.get(current).next();
+ if(n != NextOutcome.NONE_LEFT) return n;
+
+ }
+ return NextOutcome.NONE_LEFT;
+ }
+
+ @Override
+ public ROP getParent() {
+ return UnionROP.this;
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
new file mode 100644
index 0000000..bbfbc1c
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
@@ -0,0 +1,56 @@
+package org.apache.drill.exec.ref.rops;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.logical.data.SingleInputOperator;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+
+/**
+ * For simplification purposes, the Windowing reference implementation takes the lazy approach of finishing a window
+ * before it outputs any values from that window. While this is necessary in the ALL:ALL scenario, other scenarios could
+ * be implemented more efficiently with an appropriately size open window.
+ */
+public class WindowingROP extends SingleInputROPBase {
+
+ private List<RecordPointer> records = new LinkedList<RecordPointer>();
+ private WindowManager[] windows;
+ private Window[] windowPerKey;
+
+ // the place where we should start the next batch.
+ private int internalWindowPosition;
+
+ public WindowingROP(SingleInputOperator config) {
+ super(config);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void setInput(RecordIterator incoming) {
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return null;
+ }
+
+ private class Window {
+
+ int ending;
+ List<RecordPointer> records = new ArrayList<RecordPointer>();
+
+ public void reset(int curPos) {
+
+ }
+ }
+
+ private class WindowManager {
+ private void increment() {
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
new file mode 100644
index 0000000..8966280
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/BatchBreaker.java
@@ -0,0 +1,29 @@
+package org.apache.drill.exec.ref.util;
+
+import org.apache.drill.exec.ref.RecordPointer;
+
+public interface BatchBreaker {
+ public boolean shouldBreakAfter(RecordPointer record);
+
+ public static class CountBreaker implements BatchBreaker{
+ private final int max;
+ private int count = 0;
+
+ public CountBreaker(int max) {
+ super();
+ this.max = max;
+ }
+
+ @Override
+ public boolean shouldBreakAfter(RecordPointer record) {
+ count++;
+ if(count > max){
+ count = 0;
+ return true;
+ }{
+ return false;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
new file mode 100644
index 0000000..40287b5
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/util/ByteRange.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.ref.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+
+/**
+ * Facade interface for ByteBuffer and byte[]
+ */
+public interface ByteRange {
+ public byte getByte(int index);
+ public int getLength();
+ public void copyTo(byte[] buffer, int offset);
+ public void copyTo(InputStream is) throws IOException;
+ public void copyTo(ByteBuffer bb);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
new file mode 100644
index 0000000..9b14e63
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.types.DataType;
+
+
+public abstract class BaseArrayValue extends BaseDataValue implements ContainerValue{
+
+ @Override
+ public void addValue(PathSegment segment, DataValue v) {
+ DataValue fullPathValue = ValueUtils.getIntermediateValues(segment.getChild(), v);
+ if(segment.isArray()){ // we need to place this object in the given position.
+ int index = segment.getArraySegment().getIndex();
+ DataValue mergedValue = ValueUtils.getMergedDataValue(segment.getCollisionBehavior(), getByArrayIndex(index), fullPathValue);
+ addToArray(index, mergedValue);
+ }else{ // add to end of array.
+ addToArray(getNextIndex(), fullPathValue);
+ }
+ }
+
+ protected abstract void addToArray(int index, DataValue v);
+ protected abstract DataValue getByArrayIndex(int index);
+ protected abstract int getNextIndex();
+ public abstract void append(BaseArrayValue container);
+ public abstract int size();
+
+ @Override
+ public DataValue getValue(PathSegment segment) {
+ if(segment == null){ // return entire array
+ return this;
+ }else if(!segment.isArray()){ // requesting a named value from within an array. No value should be returned.
+ return DataValue.NULL_VALUE;
+ }else{
+ DataValue v = getByArrayIndex(segment.getArraySegment().getIndex());
+ if(v == null) return DataValue.NULL_VALUE;
+ return v.getValue(segment.getChild());
+ }
+ }
+
+ @Override
+ public ContainerValue getAsContainer() {
+ return this;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return DataType.ARRAY;
+ }
+
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
new file mode 100644
index 0000000..ac4cde9
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseDataValue.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.PathSegment;
+
+public abstract class BaseDataValue implements DataValue{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseDataValue.class);
+
+ @Override
+ public DataValue getValue(PathSegment segment) {
+ if(segment == null){
+ return this;
+ }else{ // looking for a lower level value when there is none.
+ return DataValue.NULL_VALUE;
+ }
+ }
+
+ public void addValue(PathSegment segment, DataValue v) {
+ throw new IllegalArgumentException("You can't add a value to a non-container type.");
+ }
+
+ @Override
+ public NumericValue getAsNumeric() {
+ throw new DrillRuntimeException(String.format("A %s value is not a NumericValue.", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public ContainerValue getAsContainer() {
+ throw new DrillRuntimeException(String.format("A %s value is not a ContainerValue.", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public StringValue getAsStringValue() {
+ throw new DrillRuntimeException(String.format("A %s value is not a StringValue.", this.getClass().getCanonicalName()));
+ }
+
+ public BytesValue getAsBytesValue(){
+ throw new DrillRuntimeException(String.format("A %s value is not a BytesValue.", this.getClass().getCanonicalName()));
+ }
+
+ public BooleanValue getAsBooleanValue(){
+ throw new DrillRuntimeException(String.format("A %s value is not a BooleanValue.", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public abstract int hashCode();
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
new file mode 100644
index 0000000..fd6a968
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+import java.util.Map;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.ValueExpressions.CollisionBehavior;
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+
+public abstract class BaseMapValue extends BaseDataValue implements ContainerValue,
+ Iterable<Map.Entry<CharSequence, DataValue>> {
+
+ @Override
+ public void addValue(PathSegment segment, DataValue v) {
+ if(v == null) throw new RecordException("You attempted to add a null value to a map.", null);
+ if (segment.isArray())
+ throw new RecordException(
+ "You're attempted to save something at a particular array location while the location of that setting was a Map.", null);
+
+ CharSequence name = segment.getNameSegment().getPath();
+ DataValue current = getByNameNoNulls(name);
+ if (!segment.isLastPath() && current != DataValue.NULL_VALUE) {
+ current.addValue(segment.getChild(), v);
+ return;
+ } else {
+ DataValue fullPathValue = ValueUtils.getIntermediateValues(segment.getChild(), v);
+ DataValue mergedValue = ValueUtils.getMergedDataValue(segment.getCollisionBehavior(), getByNameNoNulls(name),
+ fullPathValue);
+ setByName(name, mergedValue);
+
+ }
+
+ }
+
+ protected abstract void setByName(CharSequence name, DataValue v);
+
+ protected abstract DataValue getByName(CharSequence name);
+
+ private DataValue getByNameNoNulls(CharSequence name) {
+ DataValue v = getByName(name);
+ if (v == null) return NULL_VALUE;
+ return v;
+ }
+
+ @Override
+ public DataValue getValue(PathSegment segment) {
+ if (segment == null) return this;
+ if (segment.isArray()) return NULL_VALUE;
+ return getByNameNoNulls(segment.getNameSegment().getPath()).getValue(segment.getChild());
+ }
+
+ @Override
+ public ContainerValue getAsContainer() {
+ return this;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return DataType.MAP;
+ }
+
+ public void merge(BaseMapValue otherMap) {
+ for (Map.Entry<CharSequence, DataValue> e : otherMap) {
+ final DataValue oldValue = getByName(e.getKey());
+ if (oldValue == DataValue.NULL_VALUE || oldValue == null) {
+ setByName(e.getKey(), e.getValue());
+ } else {
+ setByName(e.getKey(), ValueUtils.getMergedDataValue(CollisionBehavior.MERGE_OVERRIDE, oldValue, e.getValue()));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
new file mode 100644
index 0000000..a3a9a85
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BooleanValue.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface BooleanValue extends DataValue{
+ public boolean getBoolean();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
new file mode 100644
index 0000000..460c358
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BytesValue.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface BytesValue extends DataValue, ComparableValue{
+
+ public byte[] getAsArray();
+
+ // byteBuffer like
+ public int getLength();
+ public byte get(int pos);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
new file mode 100644
index 0000000..69a75c7
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ComparableValue.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+public interface ComparableValue extends Comparable<DataValue>, DataValue {
+
+ /**
+ * Tells whether or not comparisons between the current ComparableValue and the provided DataValue are possible.
+ * @param dv2 The other DataValue
+ * @return True if comaprable. False if not comparable.
+ */
+ public boolean supportsCompare(DataValue dv2);
+
+ /**
+ * Similar to standard comparable. However, the expectation is that supportsCompare should be called first. Likely
+ * will have unexpected outcome if you don't call supportsCompare first.
+ *
+ * @param other
+ * @return
+ */
+ public int compareTo(DataValue other);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
new file mode 100644
index 0000000..42d1a97
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ContainerValue.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.values;
+
+
+
+public interface ContainerValue extends DataValue{
+ public BaseArrayValue getAsArray();
+ public BaseMapValue getAsMap();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/238ca97d/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
new file mode 100644
index 0000000..b78b8a1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java
@@ -0,0 +1,25 @@
+package org.apache.drill.exec.ref.values;
+
+import java.io.IOException;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.ref.rops.DataWriter;
+
+
+public interface DataValue {
+
+ public static final DataValue NULL_VALUE = new ScalarValues.NullValue();
+
+ public DataValue getValue(PathSegment segment);
+ public void addValue(PathSegment segment, DataValue v);
+ public void write(DataWriter writer) throws IOException;
+ public DataType getDataType();
+ public NumericValue getAsNumeric();
+ public ContainerValue getAsContainer();
+ public StringValue getAsStringValue();
+ public BooleanValue getAsBooleanValue();
+ public BytesValue getAsBytesValue();
+ public boolean equals(DataValue v);
+ public int hashCode();
+}