You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:30 UTC
[09/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
new file mode 100644
index 0000000..2900d99
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+
+import com.yammer.metrics.Timer;
+
+/**
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
+ * messages. Two child implementation, root (driving) and child (driven) exist.
+ */
+public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider, Comparable<Object>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunner.class);
+
+ private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+ private final RootExec root;
+ private final FragmentContext context;
+ private final FragmentRunnerListener listener;
+
+ public FragmentRunner(FragmentContext context, RootExec root, FragmentRunnerListener listener){
+ this.context = context;
+ this.root = root;
+ this.listener = listener;
+ }
+
+ @Override
+ public FragmentStatus getStatus() {
+ return FragmentStatus.newBuilder() //
+ .setBatchesCompleted(context.batchesCompleted.get()) //
+ .setDataProcessed(context.dataProcessed.get()) //
+ .setMemoryUse(context.getAllocator().getAllocatedMemory()) //
+ .build();
+ }
+
+ @Override
+ public void cancel() {
+ updateState(FragmentState.CANCELLED);
+ }
+
+ public UserClientConnection getClient(){
+ return context.getConnection();
+ }
+
+ @Override
+ public void run() {
+ if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+ internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+ return;
+ }
+
+ Timer.Context t = context.fragmentTime.time();
+
+ // run the query until root.next returns false.
+ try{
+ while(state.get() == FragmentState.RUNNING_VALUE){
+ if(!root.next()){
+ updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ }
+ }
+
+ // If this isn't a finished stop, we'll inform other batches to finish up.
+ if(state.get() != FragmentState.FINISHED_VALUE){
+ root.stop();
+ }
+
+ }catch(Exception ex){
+ internalFail(ex);
+ }finally{
+ t.stop();
+ }
+
+ }
+
+ private void internalFail(Throwable excep){
+ state.set(FragmentState.FAILED_VALUE);
+ listener.fail(context.getHandle(), "Failure while running fragment.", excep);
+ }
+
+ private void updateState(FragmentState update){
+ state.set(update.getNumber());
+ listener.stateChanged(context.getHandle(), update);
+ }
+
+ private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
+ boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
+ if (!success && exceptionOnFailure) {
+ internalFail(new RuntimeException(String.format(
+ "State was different than expected. Attempting to update state from %s to %s however current state was %s.",
+ current.name(), update.name(), FragmentState.valueOf(state.get()))));
+ return false;
+ }
+ listener.stateChanged(context.getHandle(), update);
+ return true;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return o.hashCode() - this.hashCode();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
new file mode 100644
index 0000000..d978470
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+
+public interface FragmentRunnerListener {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunnerListener.class);
+
+ void fail(FragmentHandle handle, String message, Throwable excep);
+ void stateChanged(FragmentHandle handle, FragmentState newState);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
new file mode 100644
index 0000000..243d677
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.google.common.base.Preconditions;
+
+public class QueryWorkUnit {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
+
+ private final PlanFragment rootFragment; // for local
+ private final FragmentRoot rootOperator; // for local
+ private final List<PlanFragment> fragments;
+
+ public QueryWorkUnit(FragmentRoot rootOperator, PlanFragment rootFragment, List<PlanFragment> fragments) {
+ super();
+ Preconditions.checkNotNull(rootFragment);
+ Preconditions.checkNotNull(fragments);
+ Preconditions.checkNotNull(rootOperator);
+
+ this.rootFragment = rootFragment;
+ this.fragments = fragments;
+ this.rootOperator = rootOperator;
+ }
+
+ public PlanFragment getRootFragment() {
+ return rootFragment;
+ }
+
+ public List<PlanFragment> getFragments() {
+ return fragments;
+ }
+
+ public FragmentRoot getRootOperator() {
+ return rootOperator;
+ }
+
+
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
new file mode 100644
index 0000000..abcb312
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public class RecordOutputStream {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordOutputStream.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
new file mode 100644
index 0000000..74fcd2b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
+
+/**
+ * Informs remote node as fragment changes state.
+ */
+public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
+
+ private final BitTunnel tunnel;
+
+ public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+ super(context);
+ this.tunnel = tunnel;
+ }
+
+ @Override
+ protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ tunnel.sendFragmentStatus(status);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
new file mode 100644
index 0000000..2e1296e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public class ResourceRequest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceRequest.class);
+
+ public long memoryMin;
+ public long memoryDesired;
+
+
+ public static class ResourceAllocation {
+ public long memory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
new file mode 100644
index 0000000..12da7ba
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public interface RootNodeDriver {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootNodeDriver.class);
+
+ public boolean doNext();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
new file mode 100644
index 0000000..bbd9df7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+
+public interface StatusProvider {
+ public FragmentStatus getStatus();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
new file mode 100644
index 0000000..d3664a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.batch.BitComHandlerImpl;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+import org.apache.drill.exec.work.user.UserWorker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+public class WorkManager implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
+
+ private Set<IncomingFragmentHandler> incomingFragments = Collections.newSetFromMap(Maps.<IncomingFragmentHandler, Boolean> newConcurrentMap());
+
+ private PriorityBlockingQueue<Runnable> pendingTasks = Queues.newPriorityBlockingQueue();
+
+ private Map<FragmentHandle, FragmentRunner> runningFragments = Maps.newConcurrentMap();
+
+ private ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
+
+ private BootStrapContext bContext;
+ private DrillbitContext dContext;
+
+ private final BitComHandler bitComWorker;
+ private final UserWorker userWorker;
+ private final WorkerBee bee;
+ private Executor executor = Executors.newFixedThreadPool(4);
+ private final EventThread eventThread;
+
+ public WorkManager(BootStrapContext context){
+ this.bee = new WorkerBee();
+ this.bContext = context;
+ this.bitComWorker = new BitComHandlerImpl(bee);
+ this.userWorker = new UserWorker(bee);
+ this.eventThread = new EventThread();
+
+ }
+
+ public void start(DrillbitEndpoint endpoint, DistributedCache cache, BitCom com, ClusterCoordinator coord){
+ this.dContext = new DrillbitContext(endpoint, bContext, coord, com, cache);
+ eventThread.start();
+ }
+
+ public BitComHandler getBitComWorker(){
+ return bitComWorker;
+ }
+
+ public UserWorker getUserWorker(){
+ return userWorker;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+
+ public DrillbitContext getContext() {
+ return dContext;
+ }
+
+ // create this so items can see the data here whether or not they are in this package.
+ public class WorkerBee{
+
+ public void addFragmentRunner(FragmentRunner runner){
+ pendingTasks.add(runner);
+ }
+
+ public void addNewForeman(Foreman foreman){
+ pendingTasks.add(foreman);
+ }
+
+
+ public void addFragmentPendingRemote(IncomingFragmentHandler handler){
+ incomingFragments.add(handler);
+ }
+
+ public void startFragmentPendingRemote(IncomingFragmentHandler handler){
+ incomingFragments.remove(handler);
+ pendingTasks.add(handler.getRunnable());
+ }
+
+ public FragmentRunner getFragmentRunner(FragmentHandle handle){
+ return runningFragments.get(handle);
+ }
+
+ public Foreman getForemanForQueryId(QueryId queryId){
+ return queries.get(queryId);
+ }
+
+ public void retireForeman(Foreman foreman){
+ queries.remove(foreman.getQueryId(), foreman);
+ }
+
+ public DrillbitContext getContext() {
+ return dContext;
+ }
+
+ }
+
+
+
+ private class EventThread extends Thread{
+ public EventThread(){
+ this.setDaemon(true);
+ this.setName("WorkManager Event Thread");
+ }
+
+ @Override
+ public void run() {
+ try {
+ while(true){
+ logger.debug("Checking for pending work tasks.");
+ Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
+ if(r != null){
+ executor.execute(r);
+ }
+
+ }
+ } catch (InterruptedException e) {
+ logger.info("Work Manager stopping as it was interrupted.");
+ }
+ }
+
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
new file mode 100644
index 0000000..5dacb71
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFragmentCollector implements BatchCollector{
+ private final List<DrillbitEndpoint> incoming;
+ private final int oppositeMajorFragmentId;
+ private final AtomicIntegerArray remainders;
+ private final AtomicInteger remainingRequired;
+ protected final RawBatchBuffer[] buffers;
+ private final AtomicInteger parentAccounter;
+ private final AtomicInteger finishedStreams = new AtomicInteger();
+
+ public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired) {
+ Preconditions.checkArgument(minInputsRequired > 0);
+ Preconditions.checkNotNull(receiver);
+ Preconditions.checkNotNull(parentAccounter);
+
+ this.parentAccounter = parentAccounter;
+ this.incoming = receiver.getProvidingEndpoints();
+ this.remainders = new AtomicIntegerArray(incoming.size());
+ this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
+ this.buffers = new RawBatchBuffer[minInputsRequired];
+ for(int i = 0; i < buffers.length; i++){
+ buffers[i] = new UnlmitedRawBatchBuffer();
+ }
+ if (receiver.supportsOutOfOrderExchange()) {
+ this.remainingRequired = new AtomicInteger(1);
+ } else {
+ this.remainingRequired = new AtomicInteger(minInputsRequired);
+ }
+ }
+
+ public int getOppositeMajorFragmentId() {
+ return oppositeMajorFragmentId;
+ }
+
+ public RawBatchBuffer[] getBuffers(){
+ return buffers;
+ }
+
+ public abstract void streamFinished(int minorFragmentId);
+
+ public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+ if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
+ int rem = remainingRequired.decrementAndGet();
+ if (rem == 0) {
+ parentAccounter.decrementAndGet();
+ }
+ }
+ if(batch.getHeader().getIsLastBatch()){
+ streamFinished(minorFragmentId);
+ }
+ getBuffer(minorFragmentId).enqueue(throttle, batch);
+ }
+
+ protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
new file mode 100644
index 0000000..ff091d7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+
+interface BatchCollector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
+
+ public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+ public int getOppositeMajorFragmentId();
+ public RawBatchBuffer[] getBuffers();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
new file mode 100644
index 0000000..97064e3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+
+public interface BitComHandler {
+
+ public abstract Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
+ throws RpcException;
+
+ public abstract void startNewRemoteFragment(PlanFragment fragment);
+
+ public abstract Ack cancelFragment(FragmentHandle handle);
+
+ public abstract void registerIncomingFragmentHandler(IncomingFragmentHandler handler);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
new file mode 100644
index 0000000..9b227da
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -0,0 +1,205 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import static org.apache.drill.exec.rpc.RpcBus.get;
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitRpcConfig;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.MessageLite;
+
+public class BitComHandlerImpl implements BitComHandler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComHandlerImpl.class);
+
+ private ConcurrentMap<FragmentHandle, IncomingFragmentHandler> handlers = Maps.newConcurrentMap();
+ private final WorkerBee bee;
+
+ public BitComHandlerImpl(WorkerBee bee) {
+ super();
+ this.bee = bee;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#handle(org.apache.drill.exec.rpc.bit.BitConnection, int, io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)
+ */
+ @Override
+ public Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType);
+
+ switch (rpcType) {
+
+ case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
+ FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ cancelFragment(handle);
+ return BitRpcConfig.OK;
+
+ case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+ connection.getListenerPool().status( get(pBody, FragmentStatus.PARSER));
+ // TODO: Support a type of message that has no response.
+ return BitRpcConfig.OK;
+
+ case RpcType.REQ_INIATILIZE_FRAGMENT_VALUE:
+ PlanFragment fragment = get(pBody, PlanFragment.PARSER);
+ startNewRemoteFragment(fragment);
+ return BitRpcConfig.OK;
+
+ case RpcType.REQ_RECORD_BATCH_VALUE:
+ try {
+ FragmentRecordBatch header = get(pBody, FragmentRecordBatch.PARSER);
+ incomingRecordBatch(connection, header, dBody);
+ return BitRpcConfig.OK;
+ } catch (FragmentSetupException e) {
+ throw new RpcException("Failure receiving record batch.", e);
+ }
+
+ default:
+ throw new RpcException("Not yet supported.");
+ }
+
+ }
+
+
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment)
+ */
+ @Override
+ public void startNewRemoteFragment(PlanFragment fragment){
+ logger.debug("Received remote fragment start instruction", fragment);
+ FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null);
+ BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
+ RemotingFragmentRunnerListener listener = new RemotingFragmentRunnerListener(context, tunnel);
+ try{
+ FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ RootExec exec = ImplCreator.getExec(context, rootOperator);
+ FragmentRunner fr = new FragmentRunner(context, exec, listener);
+ bee.addFragmentRunner(fr);
+
+ }catch(IOException e){
+ listener.fail(fragment.getHandle(), "Failure while parsing fragment execution plan.", e);
+ }catch(ExecutionSetupException e){
+ listener.fail(fragment.getHandle(), "Failure while setting up execution plan.", e);
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+ */
+ @Override
+ public Ack cancelFragment(FragmentHandle handle){
+ IncomingFragmentHandler handler = handlers.get(handle);
+ if(handler != null){
+ // try remote fragment cancel.
+ handler.cancel();
+ }else{
+ // then try local cancel.
+ FragmentRunner runner = bee.getFragmentRunner(handle);
+ if(runner != null) runner.cancel();
+ }
+
+ return Acks.OK;
+ }
+
+
+ /**
+ * Returns a positive Ack if this fragment is accepted.
+ */
+ private Ack incomingRecordBatch(RemoteConnection connection, FragmentRecordBatch fragmentBatch, ByteBuf body) throws FragmentSetupException{
+ FragmentHandle handle = fragmentBatch.getHandle();
+ IncomingFragmentHandler handler = handlers.get(handle);
+
+ // Create a handler if there isn't already one.
+ if(handler == null){
+
+
+
+ PlanFragment fragment = bee.getContext().getCache().getFragment(handle);
+ if(fragment == null){
+ logger.error("Received batch where fragment was not in cache.");
+ return Acks.FAIL;
+ }
+
+
+ IncomingFragmentHandler newHandler = new RemoteFragmentHandler(fragment, bee.getContext(), bee.getContext().getBitCom().getTunnel(fragment.getForeman()));
+
+ // since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing handlers.
+ handler = handlers.putIfAbsent(fragment.getHandle(), newHandler);
+
+ if(handler == null){
+ // we added a handler, inform foreman that we did so. This way, the foreman can track status. We also tell foreman that we don't need inform ourself.
+ bee.addFragmentPendingRemote(newHandler);
+ handler = newHandler;
+ }
+ }
+
+ boolean canRun = handler.handle(connection.getConnectionThrottle(), new RawFragmentBatch(fragmentBatch, body));
+ if(canRun){
+ // if we've reached the canRun threshold, we'll proceed. This expects handler.handle() to only return a single true.
+ bee.startFragmentPendingRemote(handler);
+ }
+ if(handler.isDone()){
+ handlers.remove(handler.getHandle());
+ }
+
+ return Acks.OK;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#registerIncomingFragmentHandler(org.apache.drill.exec.work.fragment.IncomingFragmentHandler)
+ */
+ @Override
+ public void registerIncomingFragmentHandler(IncomingFragmentHandler handler){
+ IncomingFragmentHandler old = handlers.putIfAbsent(handler.getHandle(), handler);
+ assert old == null : "You can only register a fragment handler if one hasn't been registered already.";
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
new file mode 100644
index 0000000..20775c5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Determines when a particular fragment has enough data for each of its receiving exchanges to commence execution.
+ */
+public class IncomingBuffers {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
+
+ private final AtomicInteger streamsRemaining = new AtomicInteger(0);
+ private final AtomicInteger remainingRequired = new AtomicInteger(0);
+ private final Map<Integer, BatchCollector> fragCounts;
+
+ public IncomingBuffers(PhysicalOperator root) {
+ Map<Integer, BatchCollector> counts = Maps.newHashMap();
+ root.accept(new CountRequiredFragments(), counts);
+ streamsRemaining.set(remainingRequired.get());
+ fragCounts = ImmutableMap.copyOf(counts);
+ }
+
+ public boolean batchArrived(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+ // no need to do anything if we've already enabled running.
+ logger.debug("New Batch Arrived {}", batch);
+ if(batch.getHeader().getIsLastBatch()){
+ streamsRemaining.decrementAndGet();
+ }
+
+ BatchCollector fSet = fragCounts.get(batch.getHeader().getSendingMajorFragmentId());
+ if (fSet == null) throw new FragmentSetupException("We received a major fragment id that we were not expecting.");
+ fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
+ return remainingRequired.get() == 0;
+ }
+
+ public int getRemainingRequired() {
+ int rem = remainingRequired.get();
+ if (rem < 0) return 0;
+ return rem;
+ }
+
+ public RawBatchBuffer[] getBuffers(int senderMajorFragmentId){
+ return fragCounts.get(senderMajorFragmentId).getBuffers();
+ }
+
+
+ /**
+ * Designed to setup initial values for arriving fragment accounting.
+ */
+ public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, BatchCollector>, RuntimeException> {
+
+ @Override
+ public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
+ BatchCollector set;
+ if (receiver.supportsOutOfOrderExchange()) {
+ set = new MergingCollector(remainingRequired, receiver);
+ } else {
+ set = new PartitionedCollector(remainingRequired, receiver);
+ }
+
+ counts.put(set.getOppositeMajorFragmentId(), set);
+ remainingRequired.incrementAndGet();
+ return null;
+ }
+
+
+ @Override
+ public Void visitOp(PhysicalOperator op, Map<Integer, BatchCollector> value) throws RuntimeException {
+ for(PhysicalOperator o : op){
+ o.accept(this, value);
+ }
+ return null;
+ }
+
+
+ }
+
+ public boolean isDone(){
+ return streamsRemaining.get() < 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
new file mode 100644
index 0000000..e21d69a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.physical.base.Receiver;
+
+public class MergingCollector extends AbstractFragmentCollector{
+
+ private AtomicInteger streamsRunning;
+
+ public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
+ super(parentAccounter, receiver, 1);
+ streamsRunning = new AtomicInteger(parentAccounter.get());
+ }
+
+ @Override
+ protected RawBatchBuffer getBuffer(int minorFragmentId) {
+ return buffers[0];
+ }
+
+ @Override
+ public void streamFinished(int minorFragmentId) {
+ if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
+ }
+
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
new file mode 100644
index 0000000..116ca26
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.physical.base.Receiver;
+
+public class PartitionedCollector extends AbstractFragmentCollector{
+
+ public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver) {
+ super(parentAccounter, receiver, receiver.getProvidingEndpoints().size());
+ }
+
+ @Override
+ protected RawBatchBuffer getBuffer(int minorFragmentId) {
+ return buffers[minorFragmentId];
+ }
+
+ @Override
+ public void streamFinished(int minorFragmentId) {
+ buffers[minorFragmentId].finished();
+ }
+
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
new file mode 100644
index 0000000..0f10e26
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+public interface RawBatchBuffer extends RawFragmentBatchProvider{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawBatchBuffer.class);
+
+ public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch);
+
+ /**
+ * Inform the buffer that no more records are expected.
+ */
+ public void finished();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
new file mode 100644
index 0000000..f97d878
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.Queues;
+
+public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlmitedRawBatchBuffer.class);
+
+ private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
+ private volatile boolean finished = false;
+
+ @Override
+ public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
+ buffer.add(batch);
+ }
+
+// @Override
+// public RawFragmentBatch dequeue() {
+// return buffer.poll();
+// }
+
+ @Override
+ public void kill(FragmentContext context) {
+ // TODO: Pass back or kill handler?
+ }
+
+
+ @Override
+ public void finished() {
+ finished = true;
+ }
+
+ @Override
+ public RawFragmentBatch getNext(){
+
+ RawFragmentBatch b = buffer.poll();
+ if(b == null && !finished){
+ try {
+ return buffer.take();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ return null;
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
new file mode 100644
index 0000000..d4c4014
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.slf4j.Logger;
+
+
+public class ErrorHelper {
+
+ public static DrillPBError logAndConvertError(DrillbitEndpoint endpoint, String message, Throwable t, Logger logger){
+ String id = UUID.randomUUID().toString();
+ DrillPBError.Builder builder = DrillPBError.newBuilder();
+ builder.setEndpoint(endpoint);
+ builder.setErrorId(id);
+ if(message != null){
+ builder.setMessage(message);
+ }else{
+ builder.setMessage(t.getMessage());
+ }
+ builder.setErrorType(0);
+
+ // record the error to the log for later reference.
+ logger.error("Error {}: {}", id, message, t);
+
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
new file mode 100644
index 0000000..dea8282
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -0,0 +1,272 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.AtomicState;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Foreman manages all queries where this is the driving/root node.
+ */
+public class Foreman implements Runnable, Closeable, Comparable<Object>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
+
+ private QueryId queryId;
+ private RunQuery queryRequest;
+ private QueryContext context;
+ private RunningFragmentManager fragmentManager;
+ private WorkerBee bee;
+ private UserClientConnection initiatingClient;
+ private final AtomicState<QueryState> state;
+
+
+ public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
+ RunQuery queryRequest) {
+ this.queryId = queryId;
+ this.queryRequest = queryRequest;
+ this.context = new QueryContext(queryId, dContext);
+ this.initiatingClient = connection;
+ this.fragmentManager = new RunningFragmentManager(new ForemanManagerListener(), new TunnelManager(dContext.getBitCom()));
+ this.bee = bee;
+
+ this.state = new AtomicState<QueryState>(QueryState.PENDING) {
+ protected QueryState getStateFromNumber(int i) {
+ return QueryState.valueOf(i);
+ }
+ };
+ }
+
+ private boolean isFinished(){
+ switch(state.getState()){
+ case PENDING:
+ case RUNNING:
+ return false;
+ default:
+ return true;
+ }
+
+ }
+
+ private void fail(String message, Throwable t) {
+ if(isFinished()){
+ logger.error("Received a failure message query finished of: {}", message, t);
+ }
+ DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), message, t, logger);
+ QueryResult result = QueryResult //
+ .newBuilder() //
+ .addError(error) //
+ .setIsLastChunk(true) //
+ .setQueryState(QueryState.FAILED) //
+ .build();
+ cleanupAndSendResult(result);
+ }
+
+
+ public void cancel() {
+ if(isFinished()){
+ return;
+ }
+
+ // cancel remote fragments.
+ fragmentManager.cancel();
+
+ QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.CANCELED).setIsLastChunk(true).setQueryId(queryId).build();
+ cleanupAndSendResult(result);
+ }
+
+ void cleanupAndSendResult(QueryResult result){
+ bee.retireForeman(this);
+ initiatingClient.sendResult(new QueryWritableBatch(result)).addLightListener(new ResponseSendListener());
+ }
+
+ private class ResponseSendListener extends RpcOutcomeListener<Ack> {
+ @Override
+ public void failed(RpcException ex) {
+ logger
+ .info(
+ "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
+ ex);
+ }
+ }
+
+
+
+ /**
+ * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
+ */
+ public void run() {
+ // convert a run query request into action
+
+ switch (queryRequest.getType()) {
+
+ case LOGICAL:
+ parseAndRunLogicalPlan(queryRequest.getPlan());
+ break;
+ case PHYSICAL:
+ parseAndRunPhysicalPlan(queryRequest.getPlan());
+ break;
+ case SQL:
+ runSQL(queryRequest.getPlan());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void parseAndRunLogicalPlan(String json) {
+ try {
+ LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+ PhysicalPlan physicalPlan = convert(logicalPlan);
+ runPhysicalPlan(physicalPlan);
+ } catch (IOException e) {
+ fail("Failure while parsing logical plan.", e);
+ }
+ }
+
+ private void parseAndRunPhysicalPlan(String json) {
+ try {
+ PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
+ runPhysicalPlan(plan);
+ } catch (IOException e) {
+ fail("Failure while parsing physical plan.", e);
+ }
+ }
+
+ private void runPhysicalPlan(PhysicalPlan plan) {
+
+ PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+ MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
+ Fragment rootFragment;
+ try {
+ rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
+ } catch (FragmentSetupException e) {
+ fail("Failure while fragmenting query.", e);
+ return;
+ }
+ PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
+ SimpleParallelizer parallelizer = new SimpleParallelizer();
+
+ try {
+ QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
+
+ List<PlanFragment> leafFragments = Lists.newArrayList();
+
+ // store fragments in distributed grid.
+ for (PlanFragment f : work.getFragments()) {
+ if (f.getLeafFragment()) {
+ leafFragments.add(f);
+ } else {
+ context.getCache().storeFragment(f);
+ }
+ }
+
+ fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments);
+
+
+ } catch (ExecutionSetupException e) {
+ fail("Failure while setting up query.", e);
+ }
+
+ }
+
+ private void runSQL(String json) {
+ throw new UnsupportedOperationException();
+ }
+
+ private PhysicalPlan convert(LogicalPlan plan) {
+ throw new UnsupportedOperationException();
+ }
+
+ public QueryResult getResult(UserClientConnection connection, RequestResults req) {
+
+ return null;
+ }
+
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ QueryState getQueryState(){
+ return this.state.getState();
+ }
+
+ public boolean rootCoorespondsTo(FragmentHandle handle){
+ throw new UnsupportedOperationException();
+ }
+
+ class ForemanManagerListener{
+ void fail(String message, Throwable t) {
+ ForemanManagerListener.this.fail(message, t);
+ }
+
+ void cleanupAndSendResult(QueryResult result){
+ ForemanManagerListener.this.cleanupAndSendResult(result);
+ }
+
+ }
+
+
+
+ @Override
+ public int compareTo(Object o) {
+ return o.hashCode() - o.hashCode();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
new file mode 100644
index 0000000..d906ba2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+
+public interface FragmentStatusListener {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class);
+
+ public void statusUpdate(FragmentStatus status);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
new file mode 100644
index 0000000..20797b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -0,0 +1,266 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.AbstractFragmentRunnerListener;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
+import org.apache.drill.exec.work.fragment.LocalFragmentHandler;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Each Foreman holds its own fragment manager. This manages the events associated with execution of a particular query across all fragments.
+ */
+class RunningFragmentManager implements FragmentStatusListener{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunningFragmentManager.class);
+
+ public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be
+ private final TunnelManager tun;
+ private ForemanManagerListener foreman;
+ private AtomicInteger remainingFragmentCount;
+ private FragmentRunner rootRunner;
+
+ public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
+ super();
+ this.foreman = foreman;
+ this.tun = tun;
+ this.remainingFragmentCount = new AtomicInteger(0);
+ }
+
+ public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
+ remainingFragmentCount.set(leafFragments.size()+1);
+
+ // set up the root framgnet first so we'll have incoming buffers available.
+ {
+ IncomingBuffers buffers = new IncomingBuffers(rootOperator);
+
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers);
+ RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+ // add fragment to local node.
+ map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+ rootRunner = new FragmentRunner(rootContext, rootExec, new RootFragmentManager(rootContext, rootFragment));
+ LocalFragmentHandler handler = new LocalFragmentHandler(rootFragment.getHandle(), buffers, rootRunner);
+ if(buffers.isDone()){
+ bee.addFragmentRunner(handler.getRunnable());
+ }else{
+ bee.getContext().getBitCom().registerIncomingBatchHandler(handler);
+ }
+
+ }
+
+ // send remote fragments.
+ for (PlanFragment f : leafFragments) {
+ sendRemoteFragment(f);
+ }
+
+ }
+
+ private void sendRemoteFragment(PlanFragment fragment){
+ map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
+ FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
+ tun.get(fragment.getAssignment()).sendFragment(fragment).addLightListener(listener);
+ }
+
+
+ @Override
+ public void statusUpdate(FragmentStatus status) {
+
+ switch(status.getState()){
+ case AWAITING_ALLOCATION:
+ updateStatus(status);
+ break;
+ case CANCELLED:
+ // we don't care about cancellation messages since we're the only entity that should drive cancellations.
+ break;
+ case FAILED:
+ fail(status);
+ break;
+ case FINISHED:
+ finished(status);
+ break;
+ case RUNNING:
+ updateStatus(status);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void updateStatus(FragmentStatus status){
+ map.get(status.getHandle()).setStatus(status);
+ }
+
+ private void finished(FragmentStatus status){
+ updateStatus(status);
+ int remaining = remainingFragmentCount.decrementAndGet();
+ if(remaining == 0){
+ QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.COMPLETED).build();
+ foreman.cleanupAndSendResult(result);
+ }
+ }
+
+ private void fail(FragmentStatus status){
+ updateStatus(status);
+ stopQuery();
+ QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+ foreman.cleanupAndSendResult(result);
+ }
+
+
+ private void stopQuery(){
+ // Stop all queries with a currently active status.
+// for(FragmentData data: map.values()){
+// FragmentHandle handle = data.getStatus().getHandle();
+// switch(data.getStatus().getState()){
+// case SENDING:
+// case AWAITING_ALLOCATION:
+// case RUNNING:
+// if(data.isLocal()){
+// rootRunner.cancel();
+// }else{
+// tun.get(data.getEndpoint()).cancelFragment(handle).addLightListener(new CancelListener(data.endpoint, handle));
+// }
+// break;
+// default:
+// break;
+// }
+// }
+ }
+
+ public void cancel(){
+ stopQuery();
+ }
+
+ private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
+
+ public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
+ super(endpoint, handle);
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+ }
+
+ @Override
+ public void success(Ack value) {
+ if(!value.getOk()){
+ logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+ }
+ // do nothing.
+ }
+
+ };
+
+ public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
+ return new FragmentSubmitListener(endpoint, value);
+ }
+
+
+
+ private class FragmentSubmitListener extends EndpointListener<Ack, PlanFragment>{
+
+ public FragmentSubmitListener(DrillbitEndpoint endpoint, PlanFragment value) {
+ super(endpoint, value);
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ stopQuery();
+ }
+
+ }
+
+
+ private class FragmentData{
+ private final boolean isLocal;
+ private volatile FragmentStatus status;
+ private volatile long lastStatusUpdate = 0;
+ private final DrillbitEndpoint endpoint;
+
+ public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
+ super();
+ this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
+ this.endpoint = endpoint;
+ this.isLocal = isLocal;
+ }
+
+ public void setStatus(FragmentStatus status){
+ this.status = status;
+ lastStatusUpdate = System.currentTimeMillis();
+ }
+
+ public FragmentStatus getStatus() {
+ return status;
+ }
+
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public long getLastStatusUpdate() {
+ return lastStatusUpdate;
+ }
+
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+
+ }
+
+ private class RootFragmentManager extends AbstractFragmentRunnerListener{
+
+ private RootFragmentManager(FragmentContext context, PlanFragment fragment){
+ super(context);
+ }
+
+ @Override
+ protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ RunningFragmentManager.this.updateStatus(status);
+ }
+
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
new file mode 100644
index 0000000..ad3534c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.Map;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Keeps a local list of tunnels associated with a particular Foreman.
+ */
+public class TunnelManager {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TunnelManager.class);
+
+ private final BitCom com;
+ private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newHashMap();
+
+ public TunnelManager(BitCom com){
+ this.com = com;
+ }
+
+ public BitTunnel get(DrillbitEndpoint ep){
+ BitTunnel bt = tunnels.get(ep);
+ if(bt == null){
+ bt = com.getTunnel(ep);
+ tunnels.put(ep, bt);
+ }
+
+ return bt;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
new file mode 100644
index 0000000..b4e9308
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.work.FragmentRunner;
+
+/**
+ * Handles incoming fragments as they arrive, routing them as apporpriate.
+ */
+public interface IncomingFragmentHandler {
+
+ /**
+ * Handle the next incoming fragment.
+ * @param throttle
+ * @param batch
+ * @return True if the fragment has enough incoming data to be able to be run.
+ * @throws FragmentSetupException
+ */
+ public abstract boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException;
+
+ /**
+ * Get the fragment runner for this incoming fragment. Note, this can only be requested once.
+ * @return
+ */
+ public abstract FragmentRunner getRunnable();
+
+ public abstract void cancel();
+ public boolean isDone();
+ public abstract FragmentHandle getHandle();
+}
\ No newline at end of file