You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:36 UTC
[15/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
new file mode 100644
index 0000000..0044628
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.StorageEngine.ReadEntry;
+
+import com.google.common.collect.ListMultimap;
+
+public class MockStorageEngine extends AbstractStorageEngine{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
+ return null;
+ }
+
+ @Override
+ public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
+ return null;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
new file mode 100644
index 0000000..639d0d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("mock-store")
+public class MockStorePOP extends AbstractStore {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
+
+ @JsonCreator
+ public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ }
+
+ public int getMaxWidth() {
+ return 1;
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+
+ }
+
+ @Override
+ public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+ return new MockStorePOP(child);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new MockStorePOP(child);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
new file mode 100644
index 0000000..eb77eeb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class PartitionRange {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionRange.class);
+
+ private LogicalExpression start;
+ private LogicalExpression finish;
+
+ @JsonCreator
+ public PartitionRange(@JsonProperty("start") LogicalExpression start, @JsonProperty("finish") LogicalExpression finish) {
+ super();
+ this.start = start;
+ this.finish = finish;
+ }
+
+ public LogicalExpression getStart() {
+ return start;
+ }
+
+ public LogicalExpression getFinish() {
+ return finish;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
new file mode 100644
index 0000000..e869393
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("project")
+public class Project extends AbstractSingle{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Project.class);
+
+ private final List<NamedExpression> exprs;
+
+ @JsonCreator
+ public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ this.exprs = exprs;
+ }
+
+ public List<NamedExpression> getExprs() {
+ return exprs;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitProject(this, value);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
+ }
+
+ @Override
+ public Size getSize() {
+ //TODO: This should really change the row width...
+ return child.getSize();
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new Project(exprs, child);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
new file mode 100644
index 0000000..ed41586
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractReceiver;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("random-receiver")
+public class RandomReceiver extends AbstractReceiver{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
+
+ private List<DrillbitEndpoint> senders;
+
+ @JsonCreator
+ public RandomReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, @JsonProperty("senders") List<DrillbitEndpoint> senders){
+ super(oppositeMajorFragmentId);
+ this.senders = senders;
+ }
+
+ @Override
+ public List<DrillbitEndpoint> getProvidingEndpoints() {
+ return senders;
+ }
+
+ @Override
+ public boolean supportsOutOfOrderExchange() {
+ return true;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ //TODO: deal with receiver cost through exchange.
+ return new OperatorCost(1,1,1,1);
+ }
+
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitRandomReceiver(this, value);
+ }
+
+ @Override
+ public Size getSize() {
+ //TODO: deal with size info through exchange.
+ return new Size(1,1);
+ }
+
+ @Override
+ public int getOppositeMajorFragmentId() {
+ return 0;
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
new file mode 100644
index 0000000..7d64dba
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("range-sender")
+public class RangeSender extends AbstractSender{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class);
+
+ List<EndpointPartition> partitions;
+
+ @JsonCreator
+ public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) {
+ super(oppositeMajorFragmentId, child);
+ this.partitions = partitions;
+ }
+
+ @Override
+ public List<DrillbitEndpoint> getDestinations() {
+ return null;
+ }
+
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new RangeSender(oppositeMajorFragmentId, child, partitions);
+ }
+
+
+ public static class EndpointPartition{
+ private final PartitionRange range;
+ private final DrillbitEndpoint endpoint;
+
+ @JsonCreator
+ public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) {
+ super();
+ this.range = range;
+ this.endpoint = endpoint;
+ }
+ public PartitionRange getRange() {
+ return range;
+ }
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
new file mode 100644
index 0000000..86a201d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Root;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("screen")
+public class Screen extends AbstractStore implements Root{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Screen.class);
+
+ private final DrillbitEndpoint endpoint;
+
+ public Screen(@JsonProperty("child") PhysicalOperator child, @JacksonInject DrillbitEndpoint endpoint) {
+ super(child);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.singletonList(new EndpointAffinity(endpoint, 1000000000000l));
+ }
+
+ @Override
+ public int getMaxWidth() {
+ return 1;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+ // we actually don't have to do anything since nothing should have changed. we'll check just check that things
+ // didn't get screwed up.
+ if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
+ DrillbitEndpoint endpoint = endpoints.iterator().next();
+ logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+ if (!endpoint.equals(this.endpoint)) {
+ throw new PhysicalOperatorSetupException(String.format(
+ "A Screen operator can only be assigned to its home node. Expected endpoint %s, Actual endpoint: %s",
+ this.endpoint, endpoint));
+ }
+ }
+
+ @Override
+ public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+ return new Screen(child, endpoint);
+ }
+
+ @JsonIgnore
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return "Screen [endpoint=" + endpoint + ", getChild()=" + getChild() + "]";
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new Screen(child, endpoint);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitScreen(this, value);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
new file mode 100644
index 0000000..79d937a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Sender that pushes all data to a single destination node.
+ */
+@JsonTypeName("single-sender")
+public class SingleSender extends AbstractSender {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSender.class);
+
+ private final DrillbitEndpoint destination;
+
+ @JsonCreator
+ public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) {
+ super(oppositeMajorFragmentId, child);
+ this.destination = destination;
+ }
+
+ @Override
+ @JsonIgnore
+ public List<DrillbitEndpoint> getDestinations() {
+ return Collections.singletonList(destination);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ long recordSize = child.getSize().getRecordSize() * child.getSize().getRecordCount();
+ return new OperatorCost((float) recordSize, recordSize, 0, child.getSize().getRecordCount()/(1<<16));
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new SingleSender(oppositeMajorFragmentId, child, destination);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSingleSender(this, value);
+ }
+
+
+ public DrillbitEndpoint getDestination() {
+ return destination;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
new file mode 100644
index 0000000..e4ece6b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sort")
+public class Sort extends AbstractSingle{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
+
+ private final List<OrderDef> orderings;
+ private boolean reverse = false;
+
+ @JsonCreator
+ public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
+ super(child);
+ this.orderings = orderings;
+ this.reverse = reverse;
+ }
+
+ public List<OrderDef> getOrderings() {
+ return orderings;
+ }
+
+ public boolean getReverse() {
+ return reverse;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitSort(this, value);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ Size childSize = child.getSize();
+ long n = childSize.getRecordCount();
+ long width = childSize.getRecordSize();
+
+ //TODO: Magic Number, let's assume 1/10 of data can fit in memory.
+ int k = 10;
+ long n2 = n/k;
+ double cpuCost =
+ k * n2 * (Math.log(n2)/Math.log(2)) + //
+ n * (Math.log(k)/Math.log(2));
+ double diskCost = n*width*2;
+
+ return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new Sort(child, orderings, reverse);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
new file mode 100644
index 0000000..56467ce
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -0,0 +1,79 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.ExchangeCost;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("union-exchange")
+public class UnionExchange extends AbstractExchange{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
+
+ private List<DrillbitEndpoint> senderLocations;
+ private DrillbitEndpoint destinationLocation;
+
+ public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ }
+
+ @Override
+ public void setupSenders(List<DrillbitEndpoint> senderLocations) {
+ this.senderLocations = senderLocations;
+ }
+
+ @Override
+ protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
+ if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+ this.destinationLocation = receiverLocations.iterator().next();
+ }
+
+ @Override
+ public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+ return new SingleSender(this.receiverMajorFragmentId, child, destinationLocation);
+ }
+
+ @Override
+ public Receiver getReceiver(int minorFragmentId) {
+ return new RandomReceiver(this.senderMajorFragmentId, senderLocations);
+ }
+
+ @Override
+ public int getMaxSendWidth() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new UnionExchange(child);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
new file mode 100644
index 0000000..9a7df56
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface BatchCreator<T extends PhysicalOperator> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
+
+ public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
new file mode 100644
index 0000000..6592ca1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public abstract class FilterRecordBatch implements RecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+ private RecordBatch incoming;
+ private SelectionVector selectionVector;
+ private BatchSchema schema;
+ private FilteringRecordBatchTransformer transformer;
+ private int outstanding;
+
+ public FilterRecordBatch(RecordBatch batch) {
+ this.incoming = batch;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return incoming.getContext();
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return 0;
+ }
+
+ @Override
+ public void kill() {
+ incoming.kill();
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+ return null;
+ }
+
+ abstract int applyFilter(SelectionVector vector, int count);
+
+ /**
+ * Release all assets.
+ */
+ private void close() {
+
+ }
+
+ @Override
+ public IterOutcome next() {
+ while (true) {
+ IterOutcome o = incoming.next();
+ switch (o) {
+ case OK_NEW_SCHEMA:
+ transformer = incoming.getContext().getFilteringExpression(null);
+ schema = transformer.getSchema();
+ // fall through to ok.
+ case OK:
+
+ case NONE:
+ case STOP:
+ close();
+ return IterOutcome.STOP;
+ }
+
+ if (outstanding > 0) {
+ // move data to output location.
+
+ for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
+
+ }
+ }
+
+ // make sure the bit vector is as large as the current record batch.
+ if (selectionVector.capacity() < incoming.getRecordCount()) {
+ selectionVector.allocateNew(incoming.getRecordCount());
+ }
+
+ return null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
new file mode 100644
index 0000000..191521a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+
+public abstract class FilteringRecordBatchTransformer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
+
+ final RecordBatch incoming;
+ final SelectionVector selectionVector;
+ final BatchSchema schema;
+
+ public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector selectionVector) {
+ super();
+ this.incoming = incoming;
+ this.selectionVector = selectionVector;
+ this.schema = innerSetup();
+ }
+
+ public abstract BatchSchema innerSetup();
+
+ /**
+ * Applies the filter to the selection index. Ignores any values in the selection vector, instead creating a.
+ * @return
+ */
+ public abstract int apply();
+
+ /**
+ * Applies the filter to the selection index. Utilizes the existing selection index and only evaluates on those records.
+ * @return
+ */
+ public abstract int applyWithSelection();
+
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
new file mode 100644
index 0000000..d98c107
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -0,0 +1,102 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+import org.apache.drill.exec.physical.config.MockScanPOP;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
+
+ private MockScanBatchCreator msc = new MockScanBatchCreator();
+ private ScreenCreator sc = new ScreenCreator();
+ private RandomReceiverCreator rrc = new RandomReceiverCreator();
+ private SingleSenderCreator ssc = new SingleSenderCreator();
+ private RootExec root = null;
+
+ private ImplCreator(){}
+
+ public RootExec getRoot(){
+ return root;
+ }
+
+
+ @Override
+ public RecordBatch visitScan(Scan<?> scan, FragmentContext context) throws ExecutionSetupException {
+ Preconditions.checkNotNull(scan);
+ Preconditions.checkNotNull(context);
+
+ if(scan instanceof MockScanPOP){
+ return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch> emptyList());
+ }else{
+ return super.visitScan(scan, context);
+ }
+
+ }
+
+ @Override
+ public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
+ Preconditions.checkArgument(root == null);
+ root = sc.getRoot(context, op, getChildren(op, context));
+ return null;
+ }
+
+
+
+ @Override
+ public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
+ root = ssc.getRoot(context, op, getChildren(op, context));
+ return null;
+ }
+
+ @Override
+ public RecordBatch visitRandomReceiver(RandomReceiver op, FragmentContext context) throws ExecutionSetupException {
+ return rrc.getBatch(context, op, null);
+ }
+
+ private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+ List<RecordBatch> children = Lists.newArrayList();
+ for(PhysicalOperator child : op){
+ children.add(child.accept(this, context));
+ }
+ return children;
+ }
+
+ public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
+ ImplCreator i = new ImplCreator();
+ root.accept(i, context);
+ if(i.root == null) throw new ExecutionSetupException("The provided fragment did not have a root node that correctly created a RootExec value.");
+ return i.getRoot();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
new file mode 100644
index 0000000..ce0cf66
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public interface OutputMutator {
+ public void removeField(int fieldId) throws SchemaChangeException;
+ public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
+ public void setNewSchema() throws SchemaChangeException ;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
new file mode 100644
index 0000000..9995bc2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface PhysicalConfig {
+ Class<?> value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
new file mode 100644
index 0000000..4b991f8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.batch.RawBatchBuffer;
+
+public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiverCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, RandomReceiver receiver, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ assert children == null || children.isEmpty();
+ IncomingBuffers bufHolder = context.getBuffers();
+ assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
+
+ RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+ assert buffers.length == 1;
+ RawBatchBuffer buffer = buffers[0];
+ return new WireRecordBatch(context, buffer);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
new file mode 100644
index 0000000..80def05
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface RootCreator<T extends PhysicalOperator> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
+
+ public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
new file mode 100644
index 0000000..3f8aac7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -0,0 +1,40 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+/**
+ * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
+ * output nodes and storage nodes. They are there driving force behind the completion of a query.
+ */
+public interface RootExec {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
+
+ /**
+ * Do the next batch of work.
+ * @return Whether or not additional batches of work are necessary. False means that this fragment is done.
+ */
+ public boolean next();
+
+ /**
+ * Inform all children to clean up and go away.
+ */
+ public void stop();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
new file mode 100644
index 0000000..33c1e29
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -0,0 +1,172 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+
+/**
+ * Record batch used for a particular scan. Operators against one or more
+ */
+public class ScanBatch implements RecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+
+ private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
+ private BatchSchema schema;
+ private int recordCount;
+ private boolean schemaChanged = true;
+ private final FragmentContext context;
+ private Iterator<RecordReader> readers;
+ private RecordReader currentReader;
+ private final Mutator mutator = new Mutator();
+
+ public ScanBatch(FragmentContext context, Iterator<RecordReader> readers)
+ throws ExecutionSetupException {
+ this.context = context;
+ this.readers = readers;
+ if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+ this.currentReader = readers.next();
+ this.currentReader.setup(mutator);
+ }
+
+ private void schemaChanged() {
+ schema = null;
+ schemaChanged = true;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public void kill() {
+ releaseAssets();
+ }
+
+ private void releaseAssets() {
+ fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
+ @Override
+ public void apply(int key, ValueVector<?> value) {
+ value.close();
+ }
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+ if (fields.containsKey(fieldId)) throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
+ ValueVector<?> vector = this.fields.lget();
+ if (vector.getClass().isAssignableFrom(clazz)) {
+ return (T) vector;
+ } else {
+ throw new InvalidValueAccessor(String.format(
+ "You requested a field accessor of type %s for field id %d but the actual type was %s.",
+ clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
+ }
+ }
+
+ @Override
+ public IterOutcome next() {
+ while ((recordCount = currentReader.next()) == 0) {
+ try {
+ if (!readers.hasNext()) {
+ currentReader.cleanup();
+ releaseAssets();
+ return IterOutcome.NONE;
+ }
+ currentReader.cleanup();
+ currentReader = readers.next();
+ currentReader.setup(mutator);
+ } catch (ExecutionSetupException e) {
+ this.context.fail(e);
+ releaseAssets();
+ return IterOutcome.STOP;
+ }
+ }
+
+ if (schemaChanged) {
+ schemaChanged = false;
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ return IterOutcome.OK;
+ }
+ }
+
+ private class Mutator implements OutputMutator {
+ private SchemaBuilder builder = BatchSchema.newBuilder();
+
+ public void removeField(int fieldId) throws SchemaChangeException {
+ schemaChanged();
+ ValueVector<?> v = fields.remove(fieldId);
+ if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
+ v.close();
+ }
+
+ public void addField(int fieldId, ValueVector<?> vector) {
+ schemaChanged();
+ ValueVector<?> v = fields.put(fieldId, vector);
+ vector.getField();
+ builder.addField(vector.getField());
+ if (v != null) v.close();
+ }
+
+ @Override
+ public void setNewSchema() throws SchemaChangeException {
+ ScanBatch.this.schema = this.builder.build();
+ ScanBatch.this.schemaChanged = true;
+ }
+
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(this.getRecordCount(), fields);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
new file mode 100644
index 0000000..c0711db
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
+import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+
+import com.google.common.base.Preconditions;
+
+public class ScreenCreator implements RootCreator<Screen>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+
+ @Override
+ public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new ScreenRoot(context, children.iterator().next());
+ }
+
+
+ private static class ScreenRoot implements RootExec{
+
+ final RecordBatch incoming;
+ final FragmentContext context;
+ final UserClientConnection connection;
+ private RecordMaterializer materializer;
+
+ public ScreenRoot(FragmentContext context, RecordBatch incoming){
+ assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true.";
+
+ this.context = context;
+ this.incoming = incoming;
+ this.connection = context.getConnection();
+ }
+
+ @Override
+ public boolean next() {
+ IterOutcome outcome = incoming.next();
+ boolean isLast = false;
+ switch(outcome){
+ case NONE:
+ case STOP:
+ connection.sendResult(materializer.convertNext(true));
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
+ return false;
+
+ case OK_NEW_SCHEMA:
+ materializer = new VectorRecordMaterializer(context, incoming);
+ // fall through.
+ // fall through
+ case OK:
+ connection.sendResult(materializer.convertNext(false));
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
+ return !isLast;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void stop() {
+ incoming.kill();
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
new file mode 100644
index 0000000..60c2d78
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+public class SingleSenderCreator implements RootCreator<SingleSender>{
+
+ @Override
+ public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ assert children != null && children.size() == 1;
+ return new SingleSenderRootExec(context, children.iterator().next(), config);
+ }
+
+
+ private static class SingleSenderRootExec implements RootExec{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+ private RecordBatch incoming;
+ private BitTunnel tunnel;
+ private FragmentHandle handle;
+ private int recMajor;
+ private FragmentContext context;
+
+ public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
+ logger.debug("Creating single sender root exec base on config: {}", config);
+ this.incoming = batch;
+ this.handle = context.getHandle();
+ this.recMajor = config.getOppositeMajorFragmentId();
+ this.tunnel = context.getCommunicator().getTunnel(config.getDestination());
+ this.context = context;
+ }
+
+ @Override
+ public boolean next() {
+ IterOutcome out = incoming.next();
+ logger.debug("Outcome of sender next {}", out);
+ switch(out){
+ case STOP:
+ case NONE:
+ FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(context, b2);
+ return false;
+
+
+ case OK:
+ case OK_NEW_SCHEMA:
+ FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(context, batch);
+ return true;
+
+ case NOT_YET:
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public void stop() {
+ }
+
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
new file mode 100644
index 0000000..fc7f833
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -0,0 +1,99 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class WireRecordBatch implements RecordBatch{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
+
+ private RecordBatchLoader batchLoader;
+ private RawFragmentBatchProvider fragProvider;
+ private FragmentContext context;
+
+
+ public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
+ this.fragProvider = fragProvider;
+ this.context = context;
+ this.batchLoader = new RecordBatchLoader(context.getAllocator());
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return null;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return batchLoader.getRecordCount();
+ }
+
+ @Override
+ public void kill() {
+ fragProvider.kill(context);
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+ return batchLoader.getValueVector(fieldId, clazz);
+ }
+
+ @Override
+ public IterOutcome next() {
+ RawFragmentBatch batch = this.fragProvider.getNext();
+ try{
+ if(batch == null) return IterOutcome.NONE;
+
+ RecordBatchDef rbd = batch.getHeader().getDef();
+ boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+ if(schemaChanged){
+ return IterOutcome.OK_NEW_SCHEMA;
+ }else{
+ return IterOutcome.OK;
+ }
+ }catch(SchemaChangeException ex){
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return batchLoader.getWritableBatch();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
new file mode 100644
index 0000000..187e6e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class QueryWritableBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
+
+ private final QueryResult header;
+ private final ByteBuf[] buffers;
+
+
+ public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
+ super();
+ this.header = header;
+ this.buffers = buffers;
+ }
+
+ public ByteBuf[] getBuffers(){
+ return buffers;
+ }
+
+ public QueryResult getHeader() {
+ return header;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
new file mode 100644
index 0000000..17c65e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+
+public interface RecordMaterializer {
+
+ public QueryWritableBatch convertNext(boolean isLast);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
new file mode 100644
index 0000000..e2d2eb9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class VectorRecordMaterializer implements RecordMaterializer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);
+
+ private QueryId queryId;
+ private RecordBatch batch;
+
+ public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
+ this.queryId = context.getHandle().getQueryId();
+ this.batch = batch;
+
+ for (MaterializedField f : batch.getSchema()) {
+ logger.debug("New Field: {}", f);
+ }
+ }
+
+ public QueryWritableBatch convertNext(boolean isLast) {
+ WritableBatch w = batch.getWritableBatch();
+
+ QueryResult header = QueryResult.newBuilder() //
+ .setQueryId(queryId) //
+ .setRowCount(batch.getRecordCount()) //
+ .setDef(w.getDef()).setIsLastChunk(isLast).build();
+ QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
+ return batch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
new file mode 100644
index 0000000..9b2cb85
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+
+public abstract class AbstractOpWrapperVisitor<RET, EXCEP extends Throwable> extends
+ AbstractPhysicalVisitor<RET, Wrapper, EXCEP> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractOpWrapperVisitor.class);
+
+ @Override
+ public final RET visitExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+ if (wrapper.getNode().getSendingExchange() == exchange) {
+ return visitSendingExchange(exchange, wrapper);
+ } else {
+ return visitReceivingExchange(exchange, wrapper);
+ }
+ }
+
+ public RET visitSendingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+ return visitOp(exchange, wrapper);
+ }
+
+ public RET visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+ return visitOp(exchange, wrapper);
+ }
+
+}