You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:24 UTC
[03/53] [abbrv] WIP fragmentation, physical plan, byte compiling,
some vector work
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
new file mode 100644
index 0000000..0fc7a1f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+/**
+ * Works on one incoming batch at a time. Creates one output batch for each input batch.
+ */
+public class StreamingRecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingRecordBatch.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
new file mode 100644
index 0000000..07d7099
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.exchange;
+
+public class ExchangeRecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeRecordBatch.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
deleted file mode 100644
index 6640ef2..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-
-public class PartitioningSender {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitioningSender.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
deleted file mode 100644
index c9f8147..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class RandomReceiver {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
new file mode 100644
index 0000000..5bef612
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.filter;
+
+import org.apache.drill.exec.ops.FilteringRecordBatchTransformer;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public abstract class FilterRecordBatch implements RecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+ private RecordBatch incoming;
+ private SelectionVector selectionVector;
+ private BatchSchema schema;
+ private FilteringRecordBatchTransformer transformer;
+ private int outstanding;
+
+ public FilterRecordBatch(RecordBatch batch) {
+ this.incoming = batch;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return incoming.getContext();
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return 0;
+ }
+
+ @Override
+ public void kill() {
+ incoming.kill();
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+ return null;
+ }
+
+ abstract int applyFilter(SelectionVector vector, int count);
+
+ /**
+ * Release all assets.
+ */
+ private void close() {
+
+ }
+
+ @Override
+ public IterOutcome next() {
+ while (true) {
+ IterOutcome o = incoming.next();
+ switch (o) {
+ case OK_NEW_SCHEMA:
+ transformer = incoming.getContext().getFilteringExpression(null);
+ schema = transformer.getSchema();
+ // fall through to ok.
+ case OK:
+
+ case NONE:
+ case STOP:
+ close();
+ return IterOutcome.STOP;
+ }
+
+ if (outstanding > 0) {
+ // move data to output location.
+
+ for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
+
+ }
+ }
+
+ // make sure the bit vector is as large as the current record batch.
+ if (selectionVector.size() < incoming.getRecordCount()) {
+ selectionVector.allocateNew(incoming.getRecordCount());
+ }
+
+ return null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
new file mode 100644
index 0000000..218a19a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.DirectBufferAllocator;
+import org.apache.drill.exec.record.vector.NullableInt32Vector;
+import org.apache.drill.exec.record.vector.UInt16Vector;
+import org.codehaus.janino.ExpressionEvaluator;
+
+public class SelectionVectorUpdater {
+ //static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorUpdater.class);
+
+ // Add a selection vector to a record batch.
+ /**
+ * where a + b < 10
+ */
+
+ public static int applyToBatch(final int recordCount, final NullableInt32Vector a, final NullableInt32Vector b,
+ final UInt16Vector selectionVector) {
+ int selectionIndex = 0;
+ for (int i = 0; i < recordCount; i++) {
+ int isNotNull = a.isNull(i) * b.isNull(i);
+ if (isNotNull > 0 && a.get(i) + b.get(i) < 10) {
+ selectionVector.set(selectionIndex, (char) i);
+ selectionIndex++;
+ }
+ }
+ return selectionIndex;
+ }
+
+ public static void mai2n(String[] args) {
+ int size = 1024;
+ BufferAllocator allocator = new DirectBufferAllocator();
+ NullableInt32Vector a = new NullableInt32Vector(0, allocator);
+ NullableInt32Vector b = new NullableInt32Vector(1, allocator);
+ UInt16Vector select = new UInt16Vector(2, allocator);
+ a.allocateNew(size);
+ b.allocateNew(size);
+ select.allocateNew(size);
+ int r = 0;
+ for (int i = 0; i < 1500; i++) {
+ r += applyToBatch(size, a, b, select);
+ }
+
+ System.out.println(r);
+ }
+
+public static void main(String[] args) throws Exception{
+ ExpressionEvaluator ee = new ExpressionEvaluator(
+ "c > d ? c : d", // expression
+ int.class, // expressionType
+ new String[] { "c", "d" }, // parameterNames
+ new Class[] { int.class, int.class } // parameterTypes
+ );
+
+ Integer res = (Integer) ee.evaluate(
+ new Object[] { // parameterValues
+ new Integer(10),
+ new Integer(11),
+ }
+ );
+ System.out.println("res = " + res);
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
index 9554bf3..7b76810 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
@@ -17,11 +17,18 @@
******************************************************************************/
package org.apache.drill.exec.planner;
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+
/**
* Decides level of paralellization.
* Generates smaller physical plans
*/
-public class ExecPlanner {
+public interface ExecPlanner {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecPlanner.class);
+
+ public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws FragmentSetupException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
new file mode 100644
index 0000000..51b0691
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.AbstractStore;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Scan;
+import org.apache.drill.common.physical.pop.base.Store;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+public class FragmentMaterializer extends AbstractPhysicalVisitor<PhysicalOperator, FragmentMaterializer.IndexedFragmentNode, FragmentSetupException>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentMaterializer.class);
+
+
+ @Override
+ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws FragmentSetupException {
+ if(exchange == iNode.getNode().getSendingExchange()){
+
+ // this is a sending exchange.
+ PhysicalOperator child = exchange.getChild();
+ return exchange.getSender(iNode.getMinorFragmentId(), child);
+
+ }else{
+ // receiving exchange.
+ return exchange.getReceiver(iNode.getMinorFragmentId());
+ }
+ }
+
+ @Override
+ public PhysicalOperator visitScan(Scan<?> scan, IndexedFragmentNode iNode) throws FragmentSetupException {
+ return scan.getSpecificScan(iNode.getMinorFragmentId());
+ }
+
+ @Override
+ public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws FragmentSetupException {
+ PhysicalOperator child = store.getChild();
+ return store.getSpecificStore(child, iNode.getMinorFragmentId());
+ }
+
+ @Override
+ public PhysicalOperator visitUnknown(PhysicalOperator op, IndexedFragmentNode iNode) throws FragmentSetupException {
+ return op;
+ }
+
+ public static class IndexedFragmentNode{
+ final FragmentWrapper info;
+ final int minorFragmentId;
+
+ public IndexedFragmentNode(int minorFragmentId, FragmentWrapper info) {
+ super();
+ this.info = info;
+ this.minorFragmentId = minorFragmentId;
+ }
+
+ public FragmentNode getNode() {
+ return info.getNode();
+ }
+
+ public int getMinorFragmentId() {
+ return minorFragmentId;
+ }
+
+ public FragmentWrapper getInfo() {
+ return info;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
new file mode 100644
index 0000000..f53240e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
@@ -0,0 +1,138 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+import com.google.common.collect.Lists;
+
+public class FragmentNode implements Iterable<FragmentNode.ExchangeFragmentPair>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentNode.class);
+
+ private PhysicalOperator root;
+ private Exchange sendingExchange;
+ private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList();
+ private FragmentStats stats = new FragmentStats();
+
+ public void addOperator(PhysicalOperator o){
+ if(root == null){
+ root = o;
+ }
+ }
+
+ public void addSendExchange(Exchange e) throws FragmentSetupException{
+ if(sendingExchange != null) throw new FragmentSetupException("Fragment was trying to add a second SendExchange. ");
+ sendingExchange = e;
+ }
+
+ public void addReceiveExchange(Exchange e, FragmentNode fragment){
+ this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment));
+ }
+
+ @Override
+ public Iterator<ExchangeFragmentPair> iterator() {
+ return this.receivingExchangePairs.iterator();
+ }
+
+ public List<ExchangeFragmentPair> getReceivingExchangePairs() {
+ return receivingExchangePairs;
+ }
+
+ public PhysicalOperator getRoot() {
+ return root;
+ }
+
+ public Exchange getSendingExchange() {
+ return sendingExchange;
+ }
+
+ public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra){
+ return visitor.visit(this, extra);
+ }
+
+ public FragmentStats getStats(){
+ return stats;
+ }
+
+ public class ExchangeFragmentPair {
+ private Exchange exchange;
+ private FragmentNode node;
+ public ExchangeFragmentPair(Exchange exchange, FragmentNode node) {
+ super();
+ this.exchange = exchange;
+ this.node = node;
+ }
+ public Exchange getExchange() {
+ return exchange;
+ }
+ public FragmentNode getNode() {
+ return node;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + FragmentNode.this.hashCode();
+ result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
+ result = prime * result + ((node == null) ? 0 : node.hashCode());
+ return result;
+ }
+
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((receivingExchangePairs == null) ? 0 : receivingExchangePairs.hashCode());
+ result = prime * result + ((root == null) ? 0 : root.hashCode());
+ result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.hashCode());
+ result = prime * result + ((stats == null) ? 0 : stats.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ FragmentNode other = (FragmentNode) obj;
+ if (receivingExchangePairs == null) {
+ if (other.receivingExchangePairs != null) return false;
+ } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) return false;
+ if (root == null) {
+ if (other.root != null) return false;
+ } else if (!root.equals(other.root)) return false;
+ if (sendingExchange == null) {
+ if (other.sendingExchange != null) return false;
+ } else if (!sendingExchange.equals(other.sendingExchange)) return false;
+ if (stats == null) {
+ if (other.stats != null) return false;
+ } else if (!stats.equals(other.stats)) return false;
+ return true;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
new file mode 100644
index 0000000..3f7c3a9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.ops.QueryContext;
+
+public class FragmentPlanningSet implements Iterable<FragmentWrapper>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentPlanningSet.class);
+
+ private Map<FragmentNode, FragmentWrapper> fragmentMap;
+ private int majorFragmentIdIndex = 0;
+ private QueryContext context;
+
+ public FragmentPlanningSet(QueryContext context){
+ this.context = context;
+ }
+
+ public void setStats(FragmentNode node, FragmentStats stats){
+ get(node).setStats(stats);
+ }
+
+ public void addAffinity(FragmentNode n, DrillbitEndpoint endpoint, float affinity){
+ get(n).addEndpointAffinity(endpoint, affinity);
+ }
+
+ public void setWidth(FragmentNode n, int width){
+ get(n).setWidth(width);
+ }
+
+ private FragmentWrapper get(FragmentNode node){
+ FragmentWrapper info = fragmentMap.get(node);
+ if(info == null) info = new FragmentWrapper(node, majorFragmentIdIndex++);
+ return info;
+ }
+
+ @Override
+ public Iterator<FragmentWrapper> iterator() {
+ return this.fragmentMap.values().iterator();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
new file mode 100644
index 0000000..d551aa4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.CancelableQuery;
+import org.apache.drill.exec.foreman.StatusProvider;
+import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentConverter;
+import org.apache.drill.exec.ops.FragmentRoot;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+
+/**
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
+ * messages.
+ */
+public class FragmentRunnable implements Runnable, CancelableQuery, StatusProvider {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunnable.class);
+
+ private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+ private final FragmentRoot root;
+ private final FragmentContext context;
+
+ public FragmentRunnable(DrillbitContext dbContext, long fragmentId) throws FragmentSetupException {
+ PlanFragment fragment = dbContext.getCache().getFragment(fragmentId);
+ if (fragment == null) throw new FragmentSetupException(String.format("The provided fragment id [%d] was unknown.", fragmentId));
+ this.context = new FragmentContext(dbContext, fragment);
+ this.root = FragmentConverter.getFragment(this.context);
+ }
+
+ @Override
+ public FragmentStatus getStatus() {
+ return FragmentStatus.newBuilder() //
+ .setBatchesCompleted(context.batchesCompleted.get()) //
+ .setDataProcessed(context.dataProcessed.get()) //
+ .setMemoryUse(context.getAllocator().getAllocatedMemory()) //
+ .build();
+ }
+
+ @Override
+ public boolean cancel(long queryId) {
+ if (context.getFragment().getQueryId() == queryId) {
+ state.set(FragmentState.CANCELLED_VALUE);
+ return true;
+ }
+ return false;
+ }
+
+ private void fail(Throwable cause){
+ context.fail(cause);
+ state.set(FragmentState.FAILED_VALUE);
+ }
+
+ @Override
+ public void run() {
+ if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+ fail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+ return;
+ }
+
+ Timer.Context t = context.fragmentTime.time();
+
+ // setup the query.
+ try{
+ root.setup();
+ }catch(FragmentSetupException e){
+
+ context.fail(e);
+ return;
+ }
+
+ // run the query.
+ try{
+ while(state.get() == FragmentState.RUNNING_VALUE){
+ if(!root.next()){
+ updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ }
+ }
+ t.stop();
+ }catch(Exception ex){
+ fail(ex);
+ }
+
+ }
+
+ private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
+ boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
+ if (!success && exceptionOnFailure) {
+ context.fail(new RuntimeException(String.format(
+ "State was different than expected. Attempting to update state from %s to %s however current state was %s.",
+ current.name(), update.name(), FragmentState.valueOf(state.get()))));
+ return false;
+ }
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
new file mode 100644
index 0000000..168072a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.exec.foreman.ResourceRequest;
+import org.apache.drill.exec.foreman.ResourceRequest.ResourceAllocation;
+
+import com.google.common.util.concurrent.ListenableFutureTask;
+
+public class FragmentScheduler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentScheduler.class);
+
+ public void getRunningResources(ResourceRequest resources, Runnable listener){
+ // request the resource.
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
new file mode 100644
index 0000000..512b5d0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.OperatorCost;
+
+public class FragmentStats {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
+
+ private int maxWidth = Integer.MAX_VALUE;
+ private float networkCost;
+ private float diskCost;
+ private float memoryCost;
+ private float cpuCost;
+
+ public void addMaxWidth(int width){
+ maxWidth = Math.min(maxWidth, width);
+ }
+
+ public void addCost(OperatorCost cost){
+ networkCost += cost.getNetwork();
+ diskCost += cost.getDisk();
+ memoryCost += cost.getMemory();
+ cpuCost += cost.getCpu();
+ }
+
+ public int getMaxWidth() {
+ return maxWidth;
+ }
+
+ public float getNetworkCost() {
+ return networkCost;
+ }
+
+ public float getDiskCost() {
+ return diskCost;
+ }
+
+ public float getMemoryCost() {
+ return memoryCost;
+ }
+
+ public float getCpuCost() {
+ return cpuCost;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
new file mode 100644
index 0000000..a0dcde3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Scan;
+import org.apache.drill.common.physical.pop.base.Store;
+import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+
+public class FragmentStatsCollector implements FragmentVisitor<Void, FragmentStats> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatsCollector.class);
+
+ //private HashMap<FragmentNode, FragmentStats> nodeStats = Maps.newHashMap();
+ private final StatsCollector opCollector = new StatsCollector();
+ private final FragmentPlanningSet planningSet;
+
+ public FragmentStatsCollector(FragmentPlanningSet planningSet){
+ this.planningSet = planningSet;
+ }
+
+ @Override
+ public Void visit(FragmentNode n, FragmentStats stats) {
+ Preconditions.checkNotNull(stats);
+ Preconditions.checkNotNull(n);
+
+ n.getRoot().accept(opCollector, stats);
+
+ // sending exchange.
+ Exchange sending = n.getSendingExchange();
+ if (sending != null) {
+ stats.addCost(sending.getAggregateSendCost());
+ stats.addMaxWidth(sending.getMaxSendWidth());
+ }
+
+ // receivers...
+ for (ExchangeFragmentPair child : n) {
+ // add exchange receive cost.
+ Exchange receivingExchange = child.getExchange();
+ stats.addCost(receivingExchange.getAggregateReceiveCost());
+
+ FragmentStats childStats = new FragmentStats();
+ FragmentNode childNode = child.getNode();
+ childNode.accept(this, childStats);
+ }
+
+ // store the stats for later use.
+ planningSet.setStats(n, stats);
+
+ return null;
+ }
+
+ public void collectStats(FragmentNode rootFragment) {
+ FragmentStats s = new FragmentStats();
+ rootFragment.accept(this, s);
+ }
+
+ private class StatsCollector extends AbstractPhysicalVisitor<Void, FragmentStats, RuntimeException> {
+
+ @Override
+ public Void visitExchange(Exchange exchange, FragmentStats stats) throws RuntimeException {
+ // don't do anything here since we'll add the exchange costs elsewhere. We also don't want navigate across
+ // exchanges since they are separate fragments.
+ return null;
+ }
+
+ @Override
+ public Void visitScan(Scan<?> scan, FragmentStats stats) {
+ stats.addMaxWidth(scan.getReadEntries().size());
+ return super.visitScan(scan, stats);
+ }
+
+ @Override
+ public Void visitStore(Store store, FragmentStats stats) {
+ stats.addMaxWidth(store.getMaxWidth());
+ return super.visitStore(store, stats);
+ }
+
+ @Override
+ public Void visitUnknown(PhysicalOperator op, FragmentStats stats) {
+ stats.addCost(op.getCost());
+ for (PhysicalOperator child : op) {
+ child.accept(this, stats);
+ }
+ return null;
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
new file mode 100644
index 0000000..12d2b9f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+public interface FragmentVisitor<T, V> {
+ public T visit(FragmentNode n, V extra);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
new file mode 100644
index 0000000..a1e4f81
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.physical.EndpointAffinity;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class FragmentWrapper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWrapper.class);
+
+ private final FragmentNode node;
+ private final int majorFragmentId;
+ private int width = -1;
+ private FragmentStats stats;
+ private boolean endpointsAssigned;
+ private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+
+ // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
+ // same fragment multiple times to the same endpoint.
+ private List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
+
+ public FragmentWrapper(FragmentNode node, int majorFragmentId) {
+ this.majorFragmentId = majorFragmentId;
+ this.node = node;
+ }
+
+ public FragmentStats getStats() {
+ return stats;
+ }
+
+ public void setStats(FragmentStats stats) {
+ this.stats = stats;
+ }
+
+ public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+ Preconditions.checkState(!endpointsAssigned);
+ EndpointAffinity ea = endpointAffinity.get(endpoint);
+ if (ea == null) {
+ ea = new EndpointAffinity(endpoint);
+ endpointAffinity.put(endpoint, ea);
+ }
+
+ ea.addAffinity(affinity);
+ endpointAffinity.put(endpoint, ea);
+ }
+
+ public int getMajorFragmentId() {
+ return majorFragmentId;
+ }
+
+ public int getWidth() {
+ return width;
+ }
+
+ public void setWidth(int width) {
+ Preconditions.checkState(width == -1);
+ this.width = width;
+ }
+
+ public FragmentNode getNode() {
+ return node;
+ }
+
+ public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) {
+ Preconditions.checkState(!endpointsAssigned);
+
+ endpointsAssigned = true;
+
+ List<EndpointAffinity> values = Lists.newArrayList();
+ values.addAll(endpointAffinity.values());
+
+ if(values.size() == 0){
+ final int div = allPossible.size();
+ int start = ThreadLocalRandom.current().nextInt(div);
+ // round robin with random start.
+ for(int i = start; i < start + width; i++){
+ endpoints.add(values.get(i % div).getEndpoint());
+ }
+ }else if(values.size() < width){
+ throw new NotImplementedException("Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
+ }else{
+ // get nodes with highest affinity.
+ Collections.sort(values);
+ values = Lists.reverse(values);
+ for (int i = 0; i < width; i++) {
+ endpoints.add(values.get(i).getEndpoint());
+ }
+ }
+
+ node.getSendingExchange().setupSenders(endpoints);
+ for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) {
+ e.getExchange().setupReceivers(endpoints);
+ }
+ }
+
+ public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
+ Preconditions.checkState(endpointsAssigned);
+ return this.endpoints.get(minorFragmentId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
new file mode 100644
index 0000000..5f67617
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+/**
+ * Responsible for breaking a plan into its constituent Fragments.
+ */
+public class FragmentingPhysicalVisitor extends AbstractPhysicalVisitor<FragmentNode, FragmentNode, FragmentSetupException> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentingPhysicalVisitor.class);
+
+ private FragmentNode rootFragment = new FragmentNode();
+
+ public FragmentingPhysicalVisitor(){
+ }
+
+
+ @Override
+ public FragmentNode visitExchange(Exchange exchange, FragmentNode value) throws FragmentSetupException {
+// logger.debug("Visiting Exchange {}", exchange);
+ if(value == null) throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
+ FragmentNode next = getNextBuilder();
+ value.addReceiveExchange(exchange, next);
+ next.addSendExchange(exchange);
+ exchange.getChild().accept(this, getNextBuilder());
+ return value;
+ }
+
+ @Override
+ public FragmentNode visitUnknown(PhysicalOperator op, FragmentNode value) throws FragmentSetupException{
+// logger.debug("Visiting Other {}", op);
+ value = ensureBuilder(value);
+ value.addOperator(op);
+ for(PhysicalOperator child : op){
+ child.accept(this, value);
+ }
+ return value;
+ }
+
+ private FragmentNode ensureBuilder(FragmentNode value) throws FragmentSetupException{
+ if(value != null){
+ return value;
+ }else{
+ return rootFragment;
+ }
+ }
+
+ public FragmentNode getNextBuilder(){
+ return new FragmentNode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
new file mode 100644
index 0000000..d1c85cb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.OperatorCost;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class MaterializedFragment {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaterializedFragment.class);
+
+ final PhysicalOperator root;
+ final DrillbitEndpoint endpoint;
+ final long queryId;
+ final int majorFragmentId;
+ final int minorFragmentId;
+ final OperatorCost cost;
+
+ public MaterializedFragment(PhysicalOperator root, DrillbitEndpoint endpoint, long queryId, int majorFragmentId,
+ int minorFragmentId, OperatorCost cost) {
+ super();
+ this.root = root;
+ this.endpoint = endpoint;
+ this.queryId = queryId;
+ this.majorFragmentId = majorFragmentId;
+ this.minorFragmentId = minorFragmentId;
+ this.cost = cost;
+ }
+
+ public PhysicalOperator getRoot() {
+ return root;
+ }
+
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public long getQueryId() {
+ return queryId;
+ }
+
+ public int getMajorFragmentId() {
+ return majorFragmentId;
+ }
+
+ public int getMinorFragmentId() {
+ return minorFragmentId;
+ }
+
+ public OperatorCost getCost() {
+ return cost;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
new file mode 100644
index 0000000..ff31cd9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.io.IOException;
+
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+public class PhysicalPlanReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
+
+ private final ObjectReader reader;
+
+ public PhysicalPlanReader(ObjectMapper mapper, DrillbitEndpoint endpoint) {
+ InjectableValues injectables = new InjectableValues.Std() //
+ .addValue(DrillbitEndpoint.class, endpoint); //
+ this.reader = mapper.reader(PhysicalPlan.class).with(injectables);
+ }
+
+ public PhysicalPlan read(String json) throws JsonProcessingException, IOException {
+ return reader.readValue(json);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
new file mode 100644
index 0000000..ff81d90
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Store;
+
+public class ScanFinder extends AbstractPhysicalVisitor<Boolean, Void, RuntimeException> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanFinder.class);
+
+ private static final ScanFinder finder = new ScanFinder();
+
+ private ScanFinder(){}
+
+ @Override
+ public Boolean visitExchange(Exchange exchange, Void value) throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitStore(Store store, Void value) throws RuntimeException {
+ return true;
+ }
+
+ @Override
+ public Boolean visitUnknown(PhysicalOperator op, Void value) throws RuntimeException {
+ for(PhysicalOperator child : op){
+ if(child.accept(this, null)) return true;
+ }
+ return false;
+ }
+
+ public static boolean containsScan(PhysicalOperator op){
+ return op.accept(finder, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
new file mode 100644
index 0000000..d1c3add
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+/**
+ * Parallelization is based on available nodes with source or target data. Nodes that are "overloaded" are excluded from execution.
+ */
+public class SimpleExecPlanner implements ExecPlanner{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExecPlanner.class);
+
+ private FragmentingPhysicalVisitor fragmenter = new FragmentingPhysicalVisitor();
+ private SimpleParallelizer parallelizer = new SimpleParallelizer();
+
+ @Override
+ public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws FragmentSetupException {
+
+ // get the root physical operator and split the plan into sub fragments.
+ PhysicalOperator root = plan.getSortedOperators(false).iterator().next();
+ FragmentNode fragmentRoot = root.accept(fragmenter, null);
+
+ // generate a planning set and collect stats.
+ FragmentPlanningSet planningSet = new FragmentPlanningSet(context);
+ FragmentStatsCollector statsCollector = new FragmentStatsCollector(planningSet);
+ statsCollector.collectStats(fragmentRoot);
+
+ return parallelizer.getFragments(context, fragmentRoot, planningSet, maxWidth);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
new file mode 100644
index 0000000..a52abaa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.FragmentMaterializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+public class SimpleParallelizer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
+
+ private final FragmentMaterializer materializer = new FragmentMaterializer();
+
+ /**
+ * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
+ * beyond the global max width.
+ *
+ * @param context
+ * The current QueryContext.
+ * @param planningSet
+ * The set of queries with collected statistics that we'll work with.
+ * @param globalMaxWidth
+ * The maximum level or paralellization any stage of the query can do. Note that while this might be the
+ * number of active Drillbits, realistically, this could be well beyond that number of we want to do things
+ * like speed results return.
+ * @return The list of generatoe PlanFragment protobuf objects to be assigned out to the individual nodes.
+ * @throws FragmentSetupException
+ */
+ public QueryWorkUnit getFragments(QueryContext context, FragmentNode rootNode, FragmentPlanningSet planningSet,
+ int globalMaxWidth) throws FragmentSetupException {
+ assignEndpoints(context.getActiveEndpoints(), planningSet, globalMaxWidth);
+ return generateWorkUnit(context.getQueryId(), context.getMapper(), rootNode, planningSet);
+ }
+
+ private QueryWorkUnit generateWorkUnit(long queryId, ObjectMapper mapper, FragmentNode rootNode,
+ FragmentPlanningSet planningSet) throws FragmentSetupException {
+
+ List<PlanFragment> fragments = Lists.newArrayList();
+
+ PlanFragment rootFragment = null;
+
+ // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
+ // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
+ for (FragmentWrapper info : planningSet) {
+
+ FragmentNode node = info.getNode();
+ FragmentStats stats = node.getStats();
+ PhysicalOperator abstractRoot = node.getRoot();
+ boolean isRootNode = rootNode == node;
+
+ if (isRootNode && info.getWidth() != 1)
+ throw new FragmentSetupException(
+ String
+ .format(
+ "Failure while trying to setup fragment. The root fragment must always have parallelization one. In the current case, the width was set to %d.",
+ info.getWidth()));
+ // a fragment is self driven if it doesn't rely on any other exchanges.
+ boolean selfDriven = node.getReceivingExchangePairs().size() == 0;
+
+ // Create a minorFragment for each major fragment.
+ for (int minorFragmentId = 0; minorFragmentId < info.getWidth(); minorFragmentId++) {
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, info);
+ PhysicalOperator root = abstractRoot.accept(materializer, iNode);
+
+ // get plan as JSON
+ String plan;
+ try {
+ plan = mapper.writeValueAsString(root);
+ } catch (JsonProcessingException e) {
+ throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
+ }
+
+ PlanFragment fragment = PlanFragment.newBuilder() //
+ .setCpuCost(stats.getCpuCost()) //
+ .setDiskCost(stats.getDiskCost()) //
+ .setMemoryCost(stats.getMemoryCost()) //
+ .setNetworkCost(stats.getNetworkCost()) //
+ .setFragmentJson(plan) //
+ .setMinorFragmentId(minorFragmentId) //
+ .setMajorFragmentId(info.getMajorFragmentId()).setQueryId(queryId) //
+ .setAssignment(info.getAssignedEndpoint(minorFragmentId)).setSelfDriven(selfDriven).build();
+
+ if (isRootNode) {
+ rootFragment = fragment;
+ } else {
+ fragments.add(fragment);
+ }
+ }
+ }
+
+ return new QueryWorkUnit(rootFragment, fragments);
+
+ }
+
+ private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, FragmentPlanningSet planningSet,
+ int globalMaxWidth) {
+ // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
+ // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
+ // could be based on endpoint load)
+ for (FragmentWrapper info : planningSet) {
+
+ FragmentStats stats = info.getStats();
+
+ // figure out width.
+ int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
+ float diskCost = stats.getDiskCost();
+
+ // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
+ // of tasks or the maximum width of the fragment.
+ if (diskCost < width) {
+ width = (int) diskCost;
+ }
+
+ if (width < 1) width = 1;
+ info.setWidth(width);
+
+ // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
+ info.assignEndpoints(allNodes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
new file mode 100644
index 0000000..562d109
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.receiver;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractReceiver;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("nway-ordering-receiver")
+public class NWayOrderingReceiver extends AbstractReceiver{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NWayOrderingReceiver.class);
+
+ @Override
+ public List<DrillbitEndpoint> getProvidingEndpoints() {
+ return null;
+ }
+
+ @Override
+ public boolean supportsOutOfOrderExchange() {
+ return false;
+ }
+
+ @Override
+ public int getSenderCount() {
+ return 0;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
new file mode 100644
index 0000000..487c645
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.receiver;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractReceiver;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("receiver-random")
+public class RandomReceiver extends AbstractReceiver{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
+
+ @Override
+ public List<DrillbitEndpoint> getProvidingEndpoints() {
+ return null;
+ }
+
+ @Override
+ public boolean supportsOutOfOrderExchange() {
+ return false;
+ }
+
+ @Override
+ public int getSenderCount() {
+ return 0;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
new file mode 100644
index 0000000..b0fb51c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.sender;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractSender;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("hash-partition-sender")
+public class HashPartitionSender extends AbstractSender {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
+
+ public HashPartitionSender(PhysicalOperator child) {
+ super(child);
+ }
+
+
+ @Override
+ public List<DrillbitEndpoint> getDestinations() {
+ return null;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 1d32340..d3e4b23 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -32,10 +32,12 @@ import com.google.common.collect.Lists;
public class BatchSchema implements Iterable<MaterializedField>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
- private List<MaterializedField> fields = Lists.newArrayList();
+ private final List<MaterializedField> fields;
+ private final boolean hasSelectionVector;
- private BatchSchema(List<MaterializedField> fields) {
+ private BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
this.fields = fields;
+ this.hasSelectionVector = hasSelectionVector;
}
@Override
@@ -59,10 +61,13 @@ public class BatchSchema implements Iterable<MaterializedField>{
private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+ private boolean hasSelectionVector;
+
public BatchSchemaBuilder(BatchSchema expected){
for(MaterializedField f: expected){
expectedFields.put(f.getFieldId(), f);
}
+ hasSelectionVector = expected.hasSelectionVector;
}
public BatchSchemaBuilder(){
@@ -80,6 +85,10 @@ public class BatchSchema implements Iterable<MaterializedField>{
addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
}
+ public void setSelectionVector(boolean hasSelectionVector){
+ this.hasSelectionVector = hasSelectionVector;
+ }
+
private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
if(expectedFields != null){
@@ -99,13 +108,13 @@ public class BatchSchema implements Iterable<MaterializedField>{
setTypedField(fieldId, type, nullable, mode, valueClass);
}
- public void addVector(ValueVector<?> v){
-
- }
-
- public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
-
- }
+// public void addVector(ValueVector<?> v){
+//
+// }
+//
+// public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
+//
+// }
public BatchSchema buildAndClear() throws SchemaChangeException{
@@ -116,7 +125,7 @@ public class BatchSchema implements Iterable<MaterializedField>{
if(f != null) fieldList.add(f);
}
Collections.sort(fieldList);
- return new BatchSchema(fieldList);
+ return new BatchSchema(this.hasSelectionVector, fieldList);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 3cadf89..2e941a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -70,6 +70,10 @@ public class MaterializedField implements Comparable<MaterializedField>{
check("valueMode", this.mode, expected.mode);
}
+ public MaterializedField getNullableVersion(Class<?> valueClass){
+ return new MaterializedField(fieldId, type, true, mode, valueClass);
+ }
+
@Override
public int compareTo(MaterializedField o) {
return Integer.compare(this.fieldId, o.fieldId);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
index 735493d..912e02d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBuf;
-import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
/**
* Abstract class that fixed value vectors are derived from.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
index 33a81e5..8d524b2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBuf;
-import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.DeadBuf;
public abstract class BaseValueVector<T extends BaseValueVector<T>> implements ValueVector<T>{
@@ -37,7 +37,9 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
public final void allocateNew(int valueCount){
int allocationSize = getAllocationSize(valueCount);
- resetAllocation(valueCount, allocator.buffer(allocationSize));
+ ByteBuf newBuf = allocator.buffer(allocationSize);
+ newBuf.retain();
+ resetAllocation(valueCount, newBuf);
}
protected abstract int getAllocationSize(int valueCount);