You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:31 UTC
[10/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
new file mode 100644
index 0000000..73980f9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.RpcBus;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+public class BitConnection extends RemoteConnection{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class);
+
+ private final RpcBus<RpcType, BitConnection> bus;
+ private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+ private final ListenerPool listeners;
+
+ private final AvailabilityListener listener;
+ private volatile DrillbitEndpoint endpoint;
+ private volatile boolean active = false;
+ private final UUID id;
+
+ public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+ super(channel);
+ this.bus = bus;
+ this.registry = registry;
+ // we use a local listener pool unless a global one is provided.
+ this.listeners = listeners != null ? listeners : new ListenerPool(2);
+ this.listener = listener;
+ this.id = UUID.randomUUID();
+ }
+
+ protected DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public ListenerPool getListenerPool(){
+ return listeners;
+ }
+
+ protected void setEndpoint(DrillbitEndpoint endpoint) {
+ Preconditions.checkNotNull(endpoint);
+ Preconditions.checkArgument(this.endpoint == null);
+
+ this.endpoint = endpoint;
+ BitServer.logger.debug("Adding new endpoint to available BitServer connections. Endpoint: {}.", endpoint);
+ synchronized(this){
+ BitConnection c = registry.putIfAbsent(endpoint, this);
+
+ if(c != null){ // the registry already has a connection like this
+
+ // give the awaiting future an alternative connection.
+ if(listener != null){
+ listener.isAvailable(c);
+ }
+
+ // shut this down if this is a client as it won't be available in the registry.
+ // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other. This shouldn't cause a problem.
+ logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
+ shutdownIfClient();
+
+ }
+ active = true;
+ if(listener != null) listener.isAvailable(this);
+ }
+ }
+
+ public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
+ return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
+ }
+
+ public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
+ return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
+ }
+
+ public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+ return bus.send(this, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ }
+
+ public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
+ return bus.send(this, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
+ }
+
+ public void disable(){
+ active = false;
+ }
+
+ public boolean isActive(){
+ return active;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((id == null) ? 0 : id.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ BitConnection other = (BitConnection) obj;
+ if (id == null) {
+ if (other.id != null) return false;
+ } else if (!id.equals(other.id)) return false;
+ return true;
+ }
+
+ public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
+ return new CloseHandler(this, parent);
+ }
+
+ private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+ private BitConnection connection;
+ private GenericFutureListener<ChannelFuture> parent;
+
+ public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+ super();
+ this.connection = connection;
+ this.parent = parent;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
+ parent.operationComplete(future);
+ }
+
+ }
+
+ public void shutdownIfClient(){
+ if(bus.isClient()) Closeables.closeQuietly(bus);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
new file mode 100644
index 0000000..0160d24
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class BitConnectionManager {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
+
+ private final int maxAttempts;
+ private final BitComImpl com;
+ private final DrillbitEndpoint endpoint;
+ private final AtomicReference<BitConnection> connection;
+ private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+
+ BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
+ assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
+ this.com = com;
+ this.connection = new AtomicReference<BitConnection>(connection);
+ this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
+ this.endpoint = endpoint;
+ this.maxAttempts = maxAttempts;
+ }
+
+ BitConnection getConnection(int attempt) throws RpcException{
+ BitConnection con = connection.get();
+
+ if(con != null){
+ if(con.isActive()) return con;
+ connection.compareAndSet(con, null);
+ }
+
+ CheckedFuture<BitConnection, RpcException> fut = future.get();
+
+ if(fut != null){
+ try{
+ return fut.checkedGet();
+ }catch(RpcException ex){
+ future.compareAndSet(fut, null);
+ if(attempt < maxAttempts){
+ return getConnection(attempt + 1);
+ }else{
+ throw ex;
+ }
+ }
+ }
+
+ // no checked future, let's make one.
+ fut = com.getConnectionAsync(endpoint);
+ future.compareAndSet(null, fut);
+ return getConnection(attempt);
+
+ }
+
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
new file mode 100644
index 0000000..32fd4f9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConfig;
+
+public class BitRpcConfig {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitRpcConfig.class);
+
+ public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-RPC-MAPPING") //
+ .add(RpcType.HANDSHAKE, BitHandshake.class, RpcType.HANDSHAKE, BitHandshake.class)
+ .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+ .build();
+
+ public static int RPC_VERSION = 2;
+
+ public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index e17b25c..88ac6cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -18,47 +18,76 @@
package org.apache.drill.exec.rpc.bit;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
import com.google.protobuf.MessageLite;
-public class BitServer extends BasicServer<RpcType>{
+public class BitServer extends BasicServer<RpcType, BitConnection>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
- private final DrillbitContext context;
private final BitComHandler handler;
+ private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+ private final ListenerPool listeners;
- public BitServer(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
- super(alloc, eventLoopGroup);
- this.context = context;
+ public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+ super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
this.handler = handler;
+ this.registry = registry;
+ this.listeners = listeners;
}
@Override
- protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
- return handler.getResponseDefaultInstance(rpcType);
+ public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ return BitComDefaultInstanceHandler.getResponseDefaultInstance(rpcType);
}
@Override
- protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handler.handle(context, rpcType, pBody, dBody);
+ protected Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ return handler.handle(connection, rpcType, pBody, dBody);
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
-
- return super.getCloseHandler(ch);
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
+ return connection.getCloseHandler(super.getCloseHandler(connection));
+ }
+
+ @Override
+ public BitConnection initRemoteConnection(Channel channel) {
+ return new BitConnection(null, channel, this, registry, listeners);
+ }
+
+
+ @Override
+ protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+ return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
+
+ @Override
+ public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
+ logger.debug("Handling handshake from other bit. {}", inbound);
+ if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
+ }
+
+ };
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 02991ad..652fa52 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,47 +17,222 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
/**
- * Interface provided for communication between two bits. Provided by both a server and a client implementation.
+ * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
+ * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
+ * and action. A better approach should be done.
*/
public class BitTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
- final RpcBus<?> bus;
+ private static final int MAX_ATTEMPTS = 3;
- public BitTunnel(RpcBus<?> bus){
- this.bus = bus;
- }
+ private final BitConnectionManager manager;
+ private final Executor exec;
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, RecordBatch batch){
- return null;
+
+ public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
+ this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
+ this.exec = exec;
}
-
- public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, PlanFragment fragment){
- return null;
+
+ public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
+ CheckedFuture<BitConnection, RpcException> future) {
+ this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
+ this.exec = exec;
}
- public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, FragmentHandle handle){
- return null;
+ public DrillbitEndpoint getEndpoint(){
+ return manager.getEndpoint();
}
-
- public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, FragmentHandle handle){
- return null;
+
+ private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
+ exec.execute(command);
+ return command;
+ }
+
+ public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
+ return submit(new SendBatch(batch, context));
+ }
+
+ public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
+ return submit(new SendFragment(fragment));
+ }
+
+ public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
+ return submit(new CancelFragment(handle));
}
+
+ public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
+ return submit(new SendFragmentStatus(status));
+ }
+
+ public class SendBatch extends BitCommand<Ack> {
+ final FragmentWritableBatch batch;
+ final FragmentContext context;
+
+ public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
+ super();
+ this.batch = batch;
+ this.context = context;
+ }
+
+ @Override
+ public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+ logger.debug("Sending record batch. {}", batch);
+ return connection.sendRecordBatch(context, batch);
+ }
+
+ }
+
+ public class SendFragmentStatus extends BitCommand<Ack> {
+ final FragmentStatus status;
+
+ public SendFragmentStatus(FragmentStatus status) {
+ super();
+ this.status = status;
+ }
+
+ @Override
+ public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+ return connection.sendFragmentStatus(status);
+ }
+ }
+
+ public class CancelFragment extends BitCommand<Ack> {
+ final FragmentHandle handle;
+
+ public CancelFragment(FragmentHandle handle) {
+ super();
+ this.handle = handle;
+ }
+
+ @Override
+ public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+ return connection.cancelFragment(handle);
+ }
+
+ }
+
+ public class SendFragment extends BitCommand<Ack> {
+ final PlanFragment fragment;
+
+ public SendFragment(PlanFragment fragment) {
+ super();
+ this.fragment = fragment;
+ }
+
+ @Override
+ public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+ return connection.sendFragment(fragment);
+ }
+
+ }
+
+
- public void shutdownIfClient(){
- if(bus.isClient()) Closeables.closeQuietly(bus);
+
+ private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
+
+ public void addLightListener(RpcOutcomeListener<T> outcomeListener){
+ this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+ }
+
+ public BitCommand() {
+ super(SettableFuture.<T> create());
+ }
+
+ public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
+
+ public final void run() {
+
+ try {
+
+ BitConnection connection = manager.getConnection(0);
+ assert connection != null : "The connection manager should never return a null connection. Worse case, it should throw an exception.";
+ CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
+ rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
+ } catch (RpcException ex) {
+ ((SettableFuture<T>) delegate()).setException(ex);
+ }
+
+ }
+
+ @Override
+ protected RpcException mapException(Exception e) {
+ Throwable t = e;
+ if (e instanceof ExecutionException) {
+ t = e.getCause();
+ }
+ if (t instanceof RpcException) return (RpcException) t;
+ return new RpcException(t);
+ }
+
+ public class RpcOutcomeListenerWrapper implements Runnable{
+ final RpcOutcomeListener<T> inner;
+
+ public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public void run() {
+ try{
+ inner.success(BitCommand.this.checkedGet());
+ }catch(RpcException e){
+ inner.failed(e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BitCommand ["+this.getClass().getSimpleName()+"]";
+ }
+
+
+
+ }
+
+ private class FutureBridge<T> implements Runnable {
+ final SettableFuture<T> out;
+ final CheckedFuture<T, RpcException> in;
+
+ public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
+ super();
+ this.out = out;
+ this.in = in;
+ }
+
+ @Override
+ public void run() {
+ try {
+ out.set(in.checkedGet());
+ } catch (RpcException ex) {
+ out.setException(ex);
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
new file mode 100644
index 0000000..8f299d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.work.foreman.FragmentStatusListener;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+
+public class ListenerPool {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
+
+ private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+
+ public ListenerPool(int par){
+ listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+ }
+
+ public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+ listeners.remove(handle);
+ }
+
+ public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
+ FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+ if(old != null) throw new RpcException("Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another.");
+ }
+
+ public void status(FragmentStatus status){
+ FragmentStatusListener l = listeners.get(status.getHandle());
+ if(l == null){
+ logger.info("A fragment message arrived but there was no registered listener for that message.");
+ return;
+ }else{
+ l.statusUpdate(status);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
new file mode 100644
index 0000000..3df88b7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+
+public class QueryResultBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
+
+ private final QueryResult header;
+ private final ByteBuf data;
+
+ public QueryResultBatch(QueryResult header, ByteBuf data) {
+// logger.debug("New Result Batch with header {} and data {}", header, data);
+ this.header = header;
+ this.data = data;
+ }
+
+ public QueryResult getHeader() {
+ return header;
+ }
+
+ public ByteBuf getData() {
+ return data;
+ }
+
+
+ public boolean hasData(){
+ return data != null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 0088522..5d2e799 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,57 +20,194 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
import com.google.protobuf.MessageLite;
-public class UserClient extends BasicClient<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
-
+
+ private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(alloc, eventLoopGroup);
+ super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
}
+ public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
+ this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
+ return resultsListener.getFuture();
+ }
- public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query) throws RpcException {
- return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, null);
+ public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
+ return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
}
- public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query, ByteBuf data) throws RpcException {
- return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, data);
+ private class BufferingListener extends UserResultsListener {
+
+ private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private volatile UserResultsListener output;
+
+ public boolean transferTo(UserResultsListener l) {
+ lock.writeLock().lock();
+ output = l;
+ boolean last = false;
+ for (QueryResultBatch r : results) {
+ l.resultArrived(r);
+ last = r.getHeader().getIsLastChunk();
+ }
+ if (future.isDone()) {
+ l.set();
+ }
+ return last;
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ logger.debug("Result arrvied.");
+ lock.readLock().lock();
+ try {
+ if (output == null) {
+ this.results.add(result);
+ } else {
+ output.resultArrived(result);
+ }
+
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ }
+
+ @Override
+ public void submissionFailed(RpcException ex) {
+ throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+ }
+
+ }
+
+ private class SubmissionListener extends RpcOutcomeListener<QueryId> {
+ private UserResultsListener listener;
+
+ public SubmissionListener(UserResultsListener listener) {
+ super();
+ this.listener = listener;
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ listener.submissionFailed(ex);
+ }
+
+ @Override
+ public void success(QueryId queryId) {
+ logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener);
+ UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+ // we need to deal with the situation where we already received results by the time we got the query id back. In
+ // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+ // results during the transition
+ if (oldListener != null) {
+ logger.debug("Unable to place user results listener, buffering listener was already in place.");
+ if (oldListener instanceof BufferingListener) {
+ resultsListener.remove(oldListener);
+ boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+ // simply remove the buffering listener if we already have the last response.
+ if (all) {
+ resultsListener.remove(oldListener);
+ } else {
+ boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+ if (!replaced) throw new IllegalStateException();
+ }
+ } else {
+ throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+ }
+ }
+
+ }
+
}
@Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
- switch(rpcType){
+ switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitToUserHandshake.getDefaultInstance();
case RpcType.QUERY_HANDLE_VALUE:
- return QueryHandle.getDefaultInstance();
+ return QueryId.getDefaultInstance();
case RpcType.QUERY_RESULT_VALUE:
return QueryResult.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
+ protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ switch (rpcType) {
+ case RpcType.QUERY_RESULT_VALUE:
+ final QueryResult result = get(pBody, QueryResult.PARSER);
+ final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+ UserResultsListener l = resultsListener.get(result.getQueryId());
+// logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+ if (l != null) {
+// logger.debug("Results listener available, using existing.");
+ l.resultArrived(batch);
+ if (result.getIsLastChunk()) {
+ resultsListener.remove(result.getQueryId(), l);
+ l.set();
+ }
+ } else {
+ logger.debug("Results listener not available, creating a buffering listener.");
+ // manage race condition where we start getting results before we receive the queryid back.
+ BufferingListener bl = new BufferingListener();
+ l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+ if (l != null) {
+ l.resultArrived(batch);
+ } else {
+ bl.resultArrived(batch);
+ }
+ }
+
+ return new Response(RpcType.ACK, Ack.getDefaultInstance());
+ default:
+ throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
+ }
+
+ }
@Override
- protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- logger.debug("Received a server > client message of type " + rpcType);
- return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+ protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
+ return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+
+ @Override
+ protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
+ logger.debug("Handling handshake from bit to user. {}", inbound);
+ if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
+ inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
+ }
+
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
new file mode 100644
index 0000000..3ce14f0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import java.util.concurrent.Future;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+public abstract class UserResultsListener {
+ SettableFuture<Void> future = SettableFuture.create();
+
+ final void set(){
+ future.set(null);
+ }
+
+ Future<Void> getFuture(){
+ return future;
+ }
+
+ public abstract void submissionFailed(RpcException ex);
+ public abstract void resultArrived(QueryResultBatch result);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
new file mode 100644
index 0000000..893e432
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.RpcConfig;
+
+public class UserRpcConfig {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
+
+ public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
+ .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
+ .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
+ .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
+ .build();
+
+ public static int RPC_VERSION = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index cccaa55..406afc4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,31 +19,36 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.user.UserWorker;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
-public class UserServer extends BasicServer<RpcType> {
+public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
-
- final DrillbitContext context;
-
- public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
- super(alloc, eventLoopGroup);
- this.context = context;
+
+ final UserWorker worker;
+
+ public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, UserWorker worker) {
+ super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
+ this.worker = worker;
}
@Override
@@ -55,36 +60,70 @@ public class UserServer extends BasicServer<RpcType> {
default:
throw new UnsupportedOperationException();
}
-
}
- public DrillRpcFuture<QueryResult> sendResult(RunQuery query, ByteBuf data) throws RpcException {
- return this.send(RpcType.QUERY_RESULT, query, QueryResult.class, data);
- }
-
-
@Override
- protected Response handle(SocketChannel channel, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
+ throws RpcException {
switch (rpcType) {
case RpcType.HANDSHAKE_VALUE:
-// logger.debug("Received handshake, responding in kind.");
- return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance(), null);
-
+ // logger.debug("Received handshake, responding in kind.");
+ return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance());
+
case RpcType.RUN_QUERY_VALUE:
-// logger.debug("Received query to run. Returning query handle.");
- return new Response(RpcType.QUERY_HANDLE, QueryHandle.newBuilder().setQueryId(1).build(), null);
-
+ // logger.debug("Received query to run. Returning query handle.");
+ try {
+ RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding RunQuery body.", e);
+ }
+
case RpcType.REQUEST_RESULTS_VALUE:
-// logger.debug("Received results requests. Returning empty query result.");
- return new Response(RpcType.QUERY_RESULT, QueryResult.getDefaultInstance(), null);
-
+ // logger.debug("Received results requests. Returning empty query result.");
+ try {
+ RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding RequestResults body.", e);
+ }
+
default:
throw new UnsupportedOperationException();
}
}
+
+ public class UserClientConnection extends RemoteConnection {
+ public UserClientConnection(Channel channel) {
+ super(channel);
+ }
+
+ public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
+ return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+ }
+
+ }
+
+ @Override
+ public UserClientConnection initRemoteConnection(Channel channel) {
+ return new UserClientConnection(channel);
+ }
-
+ @Override
+ protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+ return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
+
+ @Override
+ public MessageLite getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
+ logger.debug("Handling handshake from user to bit. {}", inbound);
+ if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
+ return BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build();
+ }
+ };
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
new file mode 100644
index 0000000..3c4d9af
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+
+import com.yammer.metrics.MetricRegistry;
+
+public class BootStrapContext implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
+
+ private final DrillConfig config;
+ private final NioEventLoopGroup loop;
+ private final MetricRegistry metrics;
+ private final BufferAllocator allocator;
+
+ public BootStrapContext(DrillConfig config) {
+ super();
+ this.config = config;
+ this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+ this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
+ this.allocator = BufferAllocator.getAllocator(config);
+ }
+
+ public DrillConfig getConfig() {
+ return config;
+ }
+
+ public NioEventLoopGroup getBitLoopGroup() {
+ return loop;
+ }
+
+ public MetricRegistry getMetrics() {
+ return metrics;
+ }
+
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public void close(){
+ loop.shutdown();
+ allocator.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c33afce..7d745e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -17,10 +17,9 @@
******************************************************************************/
package org.apache.drill.exec.server;
-import java.net.InetAddress;
+import java.io.Closeable;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.HazelCache;
@@ -28,15 +27,16 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.work.WorkManager;
import com.google.common.io.Closeables;
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
*/
-public class Drillbit {
+public class Drillbit implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
@@ -47,7 +47,7 @@ public class Drillbit {
Drillbit bit;
try {
logger.debug("Setting up Drillbit.");
- bit = new Drillbit(config);
+ bit = new Drillbit(config, null);
} catch (Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}
@@ -65,35 +65,37 @@ public class Drillbit {
start(options);
}
- private final DrillbitContext context;
- final BufferAllocator pool;
final ClusterCoordinator coord;
final ServiceEngine engine;
final DistributedCache cache;
- final DrillConfig config;
- private RegistrationHandle handle;
-
- public Drillbit(DrillConfig config) throws Exception {
- final DrillbitContext context = new DrillbitContext(config, this);
- Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
- this.context = context;
- this.pool = BufferAllocator.getAllocator(context);
- this.coord = new ZKClusterCoordinator(config);
- this.engine = new ServiceEngine(context);
- this.cache = new HazelCache(context.getConfig());
- this.config = config;
+ final WorkManager manager;
+ final BootStrapContext context;
+
+ private volatile RegistrationHandle handle;
+
+ public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+ if(serviceSet != null){
+ this.context = new BootStrapContext(config);
+ this.manager = new WorkManager(context);
+ this.coord = serviceSet.getCoordinator();
+ this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
+ this.cache = serviceSet.getCache();
+ }else{
+ Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+ this.context = new BootStrapContext(config);
+ this.manager = new WorkManager(context);
+ this.coord = new ZKClusterCoordinator(config);
+ this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
+ this.cache = new HazelCache(config);
+ }
}
public void run() throws Exception {
- coord.start();
- engine.start();
- DrillbitEndpoint md = DrillbitEndpoint.newBuilder()
- .setAddress(InetAddress.getLocalHost().getHostAddress())
- .setBitPort(engine.getBitPort())
- .setUserPort(engine.getUserPort())
- .build();
+ coord.start(10000);
+ DrillbitEndpoint md = engine.start();
+ cache.run();
+ manager.start(md, cache, engine.getBitCom(), coord);
handle = coord.register(md);
- cache.run(md);
}
public void close() {
@@ -107,7 +109,8 @@ public class Drillbit {
Closeables.closeQuietly(engine);
Closeables.closeQuietly(coord);
- Closeables.closeQuietly(pool);
+ Closeables.closeQuietly(manager);
+ Closeables.closeQuietly(context);
logger.info("Shutdown completed.");
}
@@ -123,5 +126,11 @@ public class Drillbit {
}
}
+ public ClusterCoordinator getCoordinator(){
+ return coord;
+ }
+ public DrillbitContext getContext(){
+ return this.manager.getContext();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b08b070..d5aaab2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -23,42 +23,60 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.store.StorageEngine;
+import com.google.common.base.Preconditions;
import com.yammer.metrics.MetricRegistry;
public class DrillbitContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+ private BootStrapContext context;
+
+ private PhysicalPlanReader reader;
+ private final ClusterCoordinator coord;
+ private final BitCom com;
+ private final DistributedCache cache;
+ private final DrillbitEndpoint endpoint;
- private final DrillConfig config;
- private final Drillbit underlyingBit;
- private final NioEventLoopGroup loop;
- private final MetricRegistry metrics;
-
- public DrillbitContext(DrillConfig config, Drillbit underlyingBit) {
+ public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
super();
- this.config = config;
- this.underlyingBit = underlyingBit;
- this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
- this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
+ Preconditions.checkNotNull(endpoint);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(com);
+ Preconditions.checkNotNull(com);
+
+ this.context = context;
+ this.coord = coord;
+ this.com = com;
+ this.cache = cache;
+ this.endpoint = endpoint;
+ this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint);
+ }
+
+ public DrillbitEndpoint getEndpoint(){
+ return endpoint;
}
public DrillConfig getConfig() {
- return config;
+ return context.getConfig();
}
public Collection<DrillbitEndpoint> getBits(){
- return underlyingBit.coord.getAvailableEndpoints();
+ return coord.getAvailableEndpoints();
}
public BufferAllocator getAllocator(){
- return underlyingBit.pool;
+ return context.getAllocator();
}
public StorageEngine getStorageEngine(StorageEngineConfig config){
@@ -66,19 +84,23 @@ public class DrillbitContext {
}
public NioEventLoopGroup getBitLoopGroup(){
- return loop;
+ return context.getBitLoopGroup();
}
public BitCom getBitCom(){
- return underlyingBit.engine.getBitCom();
+ return com;
}
public MetricRegistry getMetrics(){
- return metrics;
+ return context.getMetrics();
}
public DistributedCache getCache(){
- return underlyingBit.cache;
+ return cache;
+ }
+
+ public PhysicalPlanReader getPlanReader(){
+ return reader;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
new file mode 100644
index 0000000..0337a68
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.LocalClusterCoordinator;
+
+public class RemoteServiceSet implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
+
+ private final DistributedCache cache;
+ private final ClusterCoordinator coordinator;
+
+ public RemoteServiceSet(DistributedCache cache, ClusterCoordinator coordinator) {
+ super();
+ this.cache = cache;
+ this.coordinator = coordinator;
+ }
+
+ public DistributedCache getCache() {
+ return cache;
+ }
+
+ public ClusterCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ cache.close();
+ coordinator.close();
+ }
+
+ public static RemoteServiceSet getLocalServiceSet(){
+ return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 5d83bdb..d6d3b9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -17,49 +17,48 @@
******************************************************************************/
package org.apache.drill.exec.service;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Closeable;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.rpc.bit.BitComImpl;
import org.apache.drill.exec.rpc.user.UserServer;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.user.UserWorker;
import com.google.common.io.Closeables;
public class ServiceEngine implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
- UserServer userServer;
- BitComImpl bitCom;
- int userPort;
- int bitPort;
- DrillbitContext context;
+ private final UserServer userServer;
+ private final BitCom bitCom;
+ private final DrillConfig config;
- public ServiceEngine(DrillbitContext context){
- this.context = context;
- ByteBufAllocator allocator = context.getAllocator().getUnderlyingAllocator();
- this.userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
- this.bitCom = new BitComImpl(context);
+ public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){
+ this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker);
+ this.bitCom = new BitComImpl(context, bitComWorker);
+ this.config = context.getConfig();
}
- public void start() throws DrillbitStartupException, InterruptedException{
- userPort = userServer.bind(context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT));
- bitPort = bitCom.start();
- }
-
- public int getBitPort(){
- return bitPort;
- }
-
- public int getUserPort(){
- return userPort;
+ public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
+ int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
+ int bitPort = bitCom.start();
+ return DrillbitEndpoint.newBuilder()
+ .setAddress(InetAddress.getLocalHost().getHostAddress())
+ .setBitPort(bitPort)
+ .setUserPort(userPort)
+ .build();
}
public BitCom getBitCom(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index d89b431..80704fa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -23,8 +23,8 @@ import java.util.Collections;
import java.util.List;
import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 9fc4165..67c84ed 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -17,9 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.store;
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.ops.OutputMutator;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
public interface RecordReader {
@@ -35,7 +34,7 @@ public interface RecordReader {
* mutating the set of schema values for that particular record.
* @throws ExecutionSetupException
*/
- public abstract void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException;
+ public abstract void setup(OutputMutator output) throws ExecutionSetupException;
/**
* Increment record reader forward, writing into the provided output batch.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
index 67ea5b6..4884b7a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -22,8 +22,8 @@ import java.util.Collection;
import java.util.List;
import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
new file mode 100644
index 0000000..d2e8b8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+
+import com.google.protobuf.Internal.EnumLite;
+
+/**
+ * Simple wrapper class around AtomicInteger which allows management of a State value extending EnumLite.
+ * @param <T> The type of EnumLite to use for state.
+ */
+public abstract class AtomicState<T extends EnumLite> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicState.class);
+
+ private final AtomicInteger state = new AtomicInteger();
+
+ /**
+ * Constructor that defines initial T state.
+ * @param initial
+ */
+ public AtomicState(T initial){
+ state.set(initial.getNumber());
+ }
+
+ protected abstract T getStateFromNumber(int i);
+
+ /**
+ * Does an atomic conditional update from one state to another.
+ * @param oldState The expected current state.
+ * @param newState The desired new state.
+ * @return Whether or not the update was successful.
+ */
+ public boolean updateState(T oldState, T newState){
+ return state.compareAndSet(oldState.getNumber(), newState.getNumber());
+ }
+
+ public T getState(){
+ return getStateFromNumber(state.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
new file mode 100644
index 0000000..0e8edd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
+
+public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
+
+ private FragmentContext context;
+ private volatile long startNanos;
+
+ public AbstractFragmentRunnerListener(FragmentContext context) {
+ super();
+ this.context = context;
+ }
+
+ private FragmentStatus.Builder getBuilder(FragmentState state){
+ FragmentStatus.Builder status = FragmentStatus.newBuilder();
+ context.addMetricsToStatus(status);
+ status.setState(state);
+ status.setRunningTime(System.nanoTime() - startNanos);
+ status.setHandle(context.getHandle());
+ status.setMemoryUse(context.getAllocator().getAllocatedMemory());
+ return status;
+ }
+
+ @Override
+ public void stateChanged(FragmentHandle handle, FragmentState newState) {
+ FragmentStatus.Builder status = getBuilder(newState);
+
+ switch(newState){
+ case AWAITING_ALLOCATION:
+ awaitingAllocation(handle, status);
+ break;
+ case CANCELLED:
+ cancelled(handle, status);
+ break;
+ case FAILED:
+ // no op since fail should have also been called.
+ break;
+ case FINISHED:
+ finished(handle, status);
+ break;
+ case RUNNING:
+ this.startNanos = System.nanoTime();
+ running(handle, status);
+ break;
+ case SENDING:
+ // no op.
+ break;
+ default:
+ break;
+
+ }
+ }
+
+ protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ statusChange(handle, statusBuilder.build());
+ }
+
+ protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ statusChange(handle, statusBuilder.build());
+ }
+
+ protected void cancelled(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ statusChange(handle, statusBuilder.build());
+ }
+
+ protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ statusChange(handle, statusBuilder.build());
+ }
+
+ protected void statusChange(FragmentHandle handle, FragmentStatus status){
+
+ }
+
+ @Override
+ public final void fail(FragmentHandle handle, String message, Throwable excep) {
+ FragmentStatus.Builder status = getBuilder(FragmentState.FAILED);
+ status.setError(ErrorHelper.logAndConvertError(context.getIdentity(), message, excep, logger));
+ fail(handle, status);
+ }
+
+ protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+ statusChange(handle, statusBuilder.build());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
new file mode 100644
index 0000000..3c7ef04
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public interface CancelableQuery {
+ public void cancel();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
new file mode 100644
index 0000000..f6a9786
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
+
+ protected final DrillbitEndpoint endpoint;
+ protected final V value;
+
+ public EndpointListener(DrillbitEndpoint endpoint, V value) {
+ super();
+ this.endpoint = endpoint;
+ this.value = value;
+ }
+
+ protected DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ protected V getValue() {
+ return value;
+ }
+
+
+}