You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 07:44:50 UTC
[2/3] drill git commit: DRILL-4238: Add a custom RPC interface on the
Control channel for extensible communication between bits.
DRILL-4238: Add a custom RPC interface on the Control channel for extensible communication between bits.
This closes #313.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/67d5cc6d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/67d5cc6d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/67d5cc6d
Branch: refs/heads/master
Commit: 67d5cc6df0ca394dd26ff5c0a2b0510779bad949
Parents: 467c405
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Jan 2 14:55:21 2016 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:41:50 2016 -0800
----------------------------------------------------------------------
.../exec/rpc/control/ControlRpcConfig.java | 2 +
.../drill/exec/rpc/control/ControlTunnel.java | 174 +++++-
.../drill/exec/rpc/control/Controller.java | 63 ++
.../drill/exec/rpc/control/ControllerImpl.java | 13 +
.../exec/rpc/control/CustomHandlerRegistry.java | 124 ++++
.../rpc/control/DefaultInstanceHandler.java | 4 +-
.../exec/work/batch/ControlMessageHandler.java | 14 +
.../exec/rpc/control/TestCustomTunnel.java | 138 +++++
.../org/apache/drill/exec/proto/BitControl.java | 599 +++++++++++++++++--
.../drill/exec/proto/SchemaBitControl.java | 119 ++++
.../drill/exec/proto/beans/CustomMessage.java | 186 ++++++
protocol/src/main/protobuf/BitControl.proto | 7 +
12 files changed, 1397 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 44046c9..ec09a98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Executor;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -50,6 +51,7 @@ public class ControlRpcConfig {
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
.add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class)
.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index b90912d..ff8be1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,6 +17,13 @@
*/
package org.apache.drill.exec.rpc.control;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -26,12 +33,16 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.FutureBitCommand;
import org.apache.drill.exec.rpc.ListeningCommand;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
public class ControlTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
@@ -176,4 +187,165 @@ public class ControlTunnel {
connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
}
}
+
+ public <SEND extends Message, RECEIVE extends Message> CustomTunnel<SEND, RECEIVE> getCustomTunnel(
+ int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
+ return new CustomTunnel<SEND, RECEIVE>(messageTypeId, parser);
+ }
+
+ private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> {
+
+ private CustomMessage message;
+ private ByteBuf[] dataBodies;
+
+ public CustomMessageSender(RpcOutcomeListener<CustomMessage> listener, CustomMessage message, ByteBuf[] dataBodies) {
+ super(listener);
+ this.message = message;
+ this.dataBodies = dataBodies;
+ }
+
+ @Override
+ public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+ }
+
+ }
+
+ private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> {
+
+ private CustomMessage message;
+ private ByteBuf[] dataBodies;
+
+ public SyncCustomMessageSender(CustomMessage message, ByteBuf[] dataBodies) {
+ super();
+ this.message = message;
+ this.dataBodies = dataBodies;
+ }
+
+ @Override
+ public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+ }
+ }
+
+ /**
+ * A class used to return a synchronous future when doing custom rpc messages.
+ * @param <RECEIVE>
+ * The type of message that will be returned.
+ */
+ public class CustomFuture<RECEIVE> {
+
+ private Parser<RECEIVE> parser;
+ private DrillRpcFuture<CustomMessage> future;
+
+ public CustomFuture(Parser<RECEIVE> parser, DrillRpcFuture<CustomMessage> future) {
+ super();
+ this.parser = parser;
+ this.future = future;
+ }
+
+ public RECEIVE get() throws RpcException, InvalidProtocolBufferException {
+ CustomMessage message = future.checkedGet();
+ return parser.parseFrom(message.getMessage());
+ }
+
+ public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException,
+ InvalidProtocolBufferException {
+ CustomMessage message = future.checkedGet(timeout, unit);
+ return parser.parseFrom(message.getMessage());
+ }
+
+ public DrillBuf getBuffer() throws RpcException {
+ return (DrillBuf) future.getBuffer();
+ }
+
+ }
+
+ /**
+ * A special tunnel that can be used for custom types of messages. Its lifecycle is tied to the underlying
+ * ControlTunnel.
+ * @param <SEND>
+ * The type of message the control tunnel will be able to send.
+ * @param <RECEIVE>
+ * The expected response the control tunnel expects to receive.
+ */
+ public class CustomTunnel<SEND extends Message, RECEIVE extends Message> {
+ private int messageTypeId;
+ private Parser<RECEIVE> parser;
+
+ private CustomTunnel(int messageTypeId, Parser<RECEIVE> parser) {
+ super();
+ this.messageTypeId = messageTypeId;
+ this.parser = parser;
+ }
+
+ /**
+ * Send a message and receive a future for monitoring the outcome.
+ * @param messageToSend
+ * The structured message to send.
+ * @param dataBodies
+ * One or more optional unstructured messages to append to the structure message.
+ * @return The CustomFuture that can be used to wait for the response.
+ */
+ public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf... dataBodies) {
+ final CustomMessage customMessage = CustomMessage.newBuilder()
+ .setMessage(messageToSend.toByteString())
+ .setType(messageTypeId)
+ .build();
+ final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
+ manager.runCommand(b);
+ DrillRpcFuture<CustomMessage> innerFuture = b.getFuture();
+ return new CustomFuture<RECEIVE>(parser, innerFuture);
+ }
+
+ /**
+ * Send a message using a custom listener.
+ * @param listener
+ * The listener to inform of the outcome of the sent message.
+ * @param messageToSend
+ * The structured message to send.
+ * @param dataBodies
+ * One or more optional unstructured messages to append to the structure message.
+ */
+ public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf... dataBodies) {
+ final CustomMessage customMessage = CustomMessage.newBuilder()
+ .setMessage(messageToSend.toByteString())
+ .setType(messageTypeId)
+ .build();
+ manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage, dataBodies));
+ }
+
+ private class CustomTunnelListener implements RpcOutcomeListener<CustomMessage> {
+ final RpcOutcomeListener<RECEIVE> innerListener;
+
+ public CustomTunnelListener(RpcOutcomeListener<RECEIVE> innerListener) {
+ super();
+ this.innerListener = innerListener;
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ innerListener.failed(ex);
+ }
+
+ @Override
+ public void success(CustomMessage value, ByteBuf buffer) {
+ try {
+ RECEIVE message = parser.parseFrom(value.getMessage());
+ innerListener.success(message, buffer);
+ } catch (InvalidProtocolBufferException e) {
+ innerListener.failed(new RpcException("Failure while parsing message locally.", e));
+ }
+
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ innerListener.interrupted(e);
+ }
+
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index af60ff0..94df739 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -17,10 +17,17 @@
*/
package org.apache.drill.exec.rpc.control;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
import java.io.Closeable;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.UserRpcException;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
/**
* Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a
@@ -41,5 +48,61 @@ public interface Controller extends Closeable {
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;
+ /**
+ * Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this
+ * method manages locking internally.
+ *
+ * @param messageTypeId
+ * The type of message id to handle. This corresponds to the CustomMessage.type field. Note that only a
+ * single handler for a particular type of message can be registered within a particular Drillbit.
+ * @param handler
+ * The handler that should be used to handle this type of message.
+ * @param parser
+ * The parser used to handle the types of messages the handler above handles.
+ */
+ public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser);
+
+ /**
+ * Defines how the Controller should handle custom messages. Implementations need to be threadsafe.
+ * @param <REQUEST>
+ * The type of request message.
+ * @param <RESPONSE>
+ * The type of the response message.
+ */
+ public interface CustomMessageHandler<REQUEST extends MessageLite, RESPONSE extends MessageLite> {
+
+ /**
+ * Handle an incoming message.
+ * @param pBody
+ * The protobuf body message object of type REQUEST that was sent.
+ * @param dBody
+ * An optional byte body that was sent along with the structured message.
+ * @return The response that should be sent to the message sender.
+ * @throws UserRpcException
+ * throw this exception if there is an RPC failure that should be communicated to the sender.
+ */
+ public CustomResponse<RESPONSE> onMessage(REQUEST pBody, DrillBuf dBody) throws UserRpcException;
+ }
+
+ /**
+ * A simple interface that describes the nature of the response to the custom incoming message.
+ *
+ * @param <RESPONSE>
+ * The type of message that the respopnse contains. Must be a protobuf message type.
+ */
+ public interface CustomResponse<RESPONSE extends MessageLite> {
+
+ /**
+ * The structured portion of the response.
+ * @return A protobuf message of type RESPONSE
+ */
+ public RESPONSE getMessage();
+ /**
+ * The optional unstructured portion of the message.
+ * @return null or one or more unstructured bodies.
+ */
+ public ByteBuf[] getBodies();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 631d479..0564cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
/**
* Manages communication tunnels between nodes.
@@ -36,6 +38,7 @@ public class ControllerImpl implements Controller {
private final BootStrapContext context;
private final ConnectionManagerRegistry connectionRegistry;
private final boolean allowPortHunting;
+ private final CustomHandlerRegistry handlerRegistry;
public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) {
super();
@@ -43,6 +46,7 @@ public class ControllerImpl implements Controller {
this.context = context;
this.connectionRegistry = new ConnectionManagerRegistry(handler, context);
this.allowPortHunting = allowPortHunting;
+ this.handlerRegistry = handler.getHandlerRegistry();
}
@Override
@@ -52,6 +56,7 @@ public class ControllerImpl implements Controller {
port = server.bind(port, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
connectionRegistry.setEndpoint(completeEndpoint);
+ handlerRegistry.setEndpoint(completeEndpoint);
return completeEndpoint;
}
@@ -60,6 +65,13 @@ public class ControllerImpl implements Controller {
return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
}
+
+ @Override
+ public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser) {
+ handlerRegistry.registerCustomHandler(messageTypeId, handler, parser);
+ }
+
public void close() {
Closeables.closeQuietly(server);
for (ControlConnectionManager bt : connectionRegistry) {
@@ -67,4 +79,5 @@ public class ControllerImpl implements Controller {
}
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
new file mode 100644
index 0000000..c328cd8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
+import org.apache.drill.exec.proto.BitControl.RpcType;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserRpcException;
+import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler;
+import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class CustomHandlerRegistry {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CustomHandlerRegistry.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock read = new AutoCloseableLock(readWriteLock.readLock());
+ private final AutoCloseableLock write = new AutoCloseableLock(readWriteLock.writeLock());
+ private final IntObjectOpenHashMap<ParsingHandler<?>> handlers = new IntObjectOpenHashMap<>();
+ private volatile DrillbitEndpoint endpoint;
+
+ public CustomHandlerRegistry() {
+ }
+
+ public void setEndpoint(DrillbitEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public <SEND extends MessageLite> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<SEND, ?> handler,
+ Parser<SEND> parser) {
+ Preconditions.checkNotNull(handler);
+ Preconditions.checkNotNull(parser);
+ try (AutoCloseableLock lock = write.open()) {
+ ParsingHandler<?> parsingHandler = handlers.get(messageTypeId);
+ if (parsingHandler != null) {
+ throw new IllegalStateException(String.format(
+ "Only one handler can be registered for a given custom message type. You tried to register a handler for "
+ + "the %d message type but one had already been registered.",
+ messageTypeId));
+ }
+
+ parsingHandler = new ParsingHandler<SEND>(handler, parser);
+ handlers.put(messageTypeId, parsingHandler);
+ }
+ }
+
+ public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
+ final ParsingHandler<?> handler;
+ try (AutoCloseableLock lock = read.open()) {
+ handler = handlers.get(message.getType());
+ }
+
+ if (handler == null) {
+ throw new UserRpcException(
+ endpoint, "Unable to handle message.",
+ new IllegalStateException(String.format(
+ "Unable to handle message. The message type provided [%d] did not have a registered handler.",
+ message.getType())));
+ }
+ final CustomResponse<?> customResponse = handler.onMessage(message.getMessage(), dBody);
+ final CustomMessage responseMessage = CustomMessage.newBuilder()
+ .setMessage(customResponse.getMessage().toByteString())
+ .setType(message.getType())
+ .build();
+ // make sure we don't pass in a null array.
+ final ByteBuf[] dBodies = customResponse.getBodies() == null ? new DrillBuf[0] : customResponse.getBodies();
+ return new Response(RpcType.RESP_CUSTOM, responseMessage, dBodies);
+
+ }
+
+ private class ParsingHandler<SEND extends MessageLite> {
+ private final CustomMessageHandler<SEND, ?> handler;
+ private final Parser<SEND> parser;
+
+ public ParsingHandler(CustomMessageHandler<SEND, ?> handler, Parser<SEND> parser) {
+ super();
+ this.handler = handler;
+ this.parser = parser;
+ }
+
+ public CustomResponse<?> onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException {
+
+ try {
+ final SEND message = parser.parseFrom(pBody);
+ return handler.onMessage(message, dBody);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new UserRpcException(endpoint, "Failure parsing message.", e);
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
index 10fe343..7065201 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.BitStatus;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -46,7 +47,8 @@ public class DefaultInstanceHandler {
return BitStatus.getDefaultInstance();
case RpcType.RESP_QUERY_STATUS_VALUE:
return QueryProfile.getDefaultInstance();
-
+ case RpcType.RESP_CUSTOM_VALUE:
+ return CustomMessage.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 1c0eb80..77c069b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -19,8 +19,10 @@ package org.apache.drill.exec.work.batch;
import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -39,6 +41,7 @@ import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlRpcConfig;
import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -50,6 +53,7 @@ import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
public class ControlMessageHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
private final WorkerBee bee;
+ private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry();
public ControlMessageHandler(final WorkerBee bee) {
this.bee = bee;
@@ -69,6 +73,11 @@ public class ControlMessageHandler {
return ControlRpcConfig.OK;
}
+ case RpcType.REQ_CUSTOM_VALUE: {
+ final CustomMessage customMessage = get(pBody, CustomMessage.PARSER);
+ return handlerRegistry.handle(customMessage, (DrillBuf) dBody);
+ }
+
case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
receivingFragmentFinished(finishedReceiver);
@@ -228,4 +237,9 @@ public class ControlMessageHandler {
return Acks.OK;
}
+
+ public CustomHandlerRegistry getHandlerRegistry() {
+ return handlerRegistry;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
new file mode 100644
index 0000000..2008a48
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.internal.ThreadLocalRandom;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserRpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel.CustomFuture;
+import org.apache.drill.exec.rpc.control.ControlTunnel.CustomTunnel;
+import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler;
+import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.Test;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class TestCustomTunnel extends BaseTestQuery {
+
+ private final QueryId expectedId = QueryId
+ .newBuilder()
+ .setPart1(ThreadLocalRandom.current().nextLong())
+ .setPart2(ThreadLocalRandom.current().nextLong())
+ .build();
+
+ private final ByteBuf buf1;
+ private final byte[] expected;
+
+ public TestCustomTunnel() {
+ buf1 = UnpooledByteBufAllocator.DEFAULT.buffer(1024);
+ Random r = new Random();
+ this.expected = new byte[1024];
+ r.nextBytes(expected);
+ buf1.writeBytes(expected);
+ }
+
+ @Test
+ public void ensureRoundTrip() throws RpcException, InvalidProtocolBufferException {
+
+ final DrillbitContext context = getDrillbitContext();
+ final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), false);
+ context.getController().registerCustomHandler(1001, handler, DrillbitEndpoint.PARSER);
+ final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+ final CustomTunnel<DrillbitEndpoint, QueryId> tunnel = loopbackTunnel.getCustomTunnel(1001, DrillbitEndpoint.class,
+ QueryId.PARSER);
+ CustomFuture<QueryId> future = tunnel.send(context.getEndpoint());
+ assertEquals(expectedId, future.get());
+ }
+
+ @Test
+ public void ensureRoundTripBytes() throws RpcException, InvalidProtocolBufferException {
+ final DrillbitContext context = getDrillbitContext();
+ final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), true);
+ context.getController().registerCustomHandler(1002, handler, DrillbitEndpoint.PARSER);
+ final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+ final CustomTunnel<DrillbitEndpoint, QueryId> tunnel = loopbackTunnel.getCustomTunnel(1002, DrillbitEndpoint.class,
+ QueryId.PARSER);
+ buf1.retain();
+ CustomFuture<QueryId> future = tunnel.send(context.getEndpoint(), buf1);
+ assertEquals(expectedId, future.get());
+ byte[] actual = new byte[1024];
+ future.getBuffer().getBytes(0, actual);
+ future.getBuffer().release();
+ assertTrue(Arrays.equals(expected, actual));
+ }
+
+ private class TestCustomMessageHandler implements CustomMessageHandler<DrillbitEndpoint, QueryId> {
+ private DrillbitEndpoint expectedValue;
+ private final boolean returnBytes;
+
+ public TestCustomMessageHandler(DrillbitEndpoint expectedValue, boolean returnBytes) {
+ super();
+ this.expectedValue = expectedValue;
+ this.returnBytes = returnBytes;
+ }
+
+ @Override
+ public CustomResponse<QueryId> onMessage(DrillbitEndpoint pBody, DrillBuf dBody) throws UserRpcException {
+
+ if (!expectedValue.equals(pBody)) {
+ throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException());
+ }
+
+ if (returnBytes) {
+ byte[] actual = new byte[1024];
+ dBody.getBytes(0, actual);
+ if (!Arrays.equals(expected, actual)) {
+ throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException());
+ }
+ }
+
+ return new CustomResponse<QueryId>() {
+
+ @Override
+ public QueryId getMessage() {
+ return expectedId;
+ }
+
+ @Override
+ public ByteBuf[] getBodies() {
+ if (returnBytes) {
+ buf1.retain();
+ return new ByteBuf[] { buf1 };
+ } else {
+ return null;
+ }
+ }
+
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index 493036b..b16934d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -99,25 +99,33 @@ public final class BitControl {
*/
REQ_UNPAUSE_FRAGMENT(10, 16),
/**
+ * <code>REQ_CUSTOM = 17;</code>
+ */
+ REQ_CUSTOM(11, 17),
+ /**
* <code>RESP_FRAGMENT_HANDLE = 11;</code>
*
* <pre>
* bit responses
* </pre>
*/
- RESP_FRAGMENT_HANDLE(11, 11),
+ RESP_FRAGMENT_HANDLE(12, 11),
/**
* <code>RESP_FRAGMENT_STATUS = 12;</code>
*/
- RESP_FRAGMENT_STATUS(12, 12),
+ RESP_FRAGMENT_STATUS(13, 12),
/**
* <code>RESP_BIT_STATUS = 13;</code>
*/
- RESP_BIT_STATUS(13, 13),
+ RESP_BIT_STATUS(14, 13),
/**
* <code>RESP_QUERY_STATUS = 14;</code>
*/
- RESP_QUERY_STATUS(14, 14),
+ RESP_QUERY_STATUS(15, 14),
+ /**
+ * <code>RESP_CUSTOM = 18;</code>
+ */
+ RESP_CUSTOM(16, 18),
;
/**
@@ -185,6 +193,10 @@ public final class BitControl {
*/
public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16;
/**
+ * <code>REQ_CUSTOM = 17;</code>
+ */
+ public static final int REQ_CUSTOM_VALUE = 17;
+ /**
* <code>RESP_FRAGMENT_HANDLE = 11;</code>
*
* <pre>
@@ -204,6 +216,10 @@ public final class BitControl {
* <code>RESP_QUERY_STATUS = 14;</code>
*/
public static final int RESP_QUERY_STATUS_VALUE = 14;
+ /**
+ * <code>RESP_CUSTOM = 18;</code>
+ */
+ public static final int RESP_CUSTOM_VALUE = 18;
public final int getNumber() { return value; }
@@ -221,10 +237,12 @@ public final class BitControl {
case 10: return REQ_QUERY_STATUS;
case 15: return REQ_QUERY_CANCEL;
case 16: return REQ_UNPAUSE_FRAGMENT;
+ case 17: return REQ_CUSTOM;
case 11: return RESP_FRAGMENT_HANDLE;
case 12: return RESP_FRAGMENT_STATUS;
case 13: return RESP_BIT_STATUS;
case 14: return RESP_QUERY_STATUS;
+ case 18: return RESP_CUSTOM;
default: return null;
}
}
@@ -3001,6 +3019,486 @@ public final class BitControl {
// @@protoc_insertion_point(class_scope:exec.bit.control.InitializeFragments)
}
+ public interface CustomMessageOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional int32 type = 1;
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ boolean hasType();
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ int getType();
+
+ // optional bytes message = 2;
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ boolean hasMessage();
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ com.google.protobuf.ByteString getMessage();
+ }
+ /**
+ * Protobuf type {@code exec.bit.control.CustomMessage}
+ */
+ public static final class CustomMessage extends
+ com.google.protobuf.GeneratedMessage
+ implements CustomMessageOrBuilder {
+ // Use CustomMessage.newBuilder() to construct.
+ private CustomMessage(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private CustomMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final CustomMessage defaultInstance;
+ public static CustomMessage getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CustomMessage getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private CustomMessage(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ type_ = input.readInt32();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ message_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<CustomMessage> PARSER =
+ new com.google.protobuf.AbstractParser<CustomMessage>() {
+ public CustomMessage parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new CustomMessage(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<CustomMessage> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // optional int32 type = 1;
+ public static final int TYPE_FIELD_NUMBER = 1;
+ private int type_;
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public int getType() {
+ return type_;
+ }
+
+ // optional bytes message = 2;
+ public static final int MESSAGE_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString message_;
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public boolean hasMessage() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public com.google.protobuf.ByteString getMessage() {
+ return message_;
+ }
+
+ private void initFields() {
+ type_ = 0;
+ message_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt32(1, type_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, message_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(1, type_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, message_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.drill.exec.proto.BitControl.CustomMessage prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code exec.bit.control.CustomMessage}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.drill.exec.proto.BitControl.CustomMessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+ }
+
+ // Construct using org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ type_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ message_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+ }
+
+ public org.apache.drill.exec.proto.BitControl.CustomMessage getDefaultInstanceForType() {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance();
+ }
+
+ public org.apache.drill.exec.proto.BitControl.CustomMessage build() {
+ org.apache.drill.exec.proto.BitControl.CustomMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.drill.exec.proto.BitControl.CustomMessage buildPartial() {
+ org.apache.drill.exec.proto.BitControl.CustomMessage result = new org.apache.drill.exec.proto.BitControl.CustomMessage(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.type_ = type_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.message_ = message_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.drill.exec.proto.BitControl.CustomMessage) {
+ return mergeFrom((org.apache.drill.exec.proto.BitControl.CustomMessage)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.drill.exec.proto.BitControl.CustomMessage other) {
+ if (other == org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance()) return this;
+ if (other.hasType()) {
+ setType(other.getType());
+ }
+ if (other.hasMessage()) {
+ setMessage(other.getMessage());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.drill.exec.proto.BitControl.CustomMessage parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.drill.exec.proto.BitControl.CustomMessage) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional int32 type = 1;
+ private int type_ ;
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public int getType() {
+ return type_;
+ }
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public Builder setType(int value) {
+ bitField0_ |= 0x00000001;
+ type_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 type = 1;</code>
+ */
+ public Builder clearType() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ type_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes message = 2;
+ private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public boolean hasMessage() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public com.google.protobuf.ByteString getMessage() {
+ return message_;
+ }
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public Builder setMessage(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ message_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes message = 2;</code>
+ */
+ public Builder clearMessage() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ message_ = getDefaultInstance().getMessage();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:exec.bit.control.CustomMessage)
+ }
+
+ static {
+ defaultInstance = new CustomMessage(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:exec.bit.control.CustomMessage)
+ }
+
public interface PlanFragmentOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -8440,6 +8938,11 @@ public final class BitControl {
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_exec_bit_control_InitializeFragments_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_exec_bit_control_CustomMessage_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_exec_bit_control_CustomMessage_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
internal_static_exec_bit_control_PlanFragment_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -8485,41 +8988,43 @@ public final class BitControl {
"red.MinorFragmentProfile\022(\n\006handle\030\002 \001(\013",
"2\030.exec.bit.FragmentHandle\"G\n\023Initialize" +
"Fragments\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.c" +
- "ontrol.PlanFragment\"\374\003\n\014PlanFragment\022(\n\006" +
- "handle\030\001 \001(\0132\030.exec.bit.FragmentHandle\022\024" +
- "\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002\022\021" +
- "\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002\022\025" +
- "\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fragment\030\t" +
- " \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec.Drillbit" +
- "Endpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec.Drillbi" +
- "tEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\01020000000",
- "\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n\013credent" +
- "ials\030\016 \001(\0132\034.exec.shared.UserCredentials" +
- "\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(\0132" +
- ").exec.bit.control.QueryContextInformati" +
- "on\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.control" +
- ".Collector\"\210\001\n\tCollector\022\"\n\032opposite_maj" +
- "or_fragment_id\030\001 \001(\005\022#\n\027incoming_minor_f" +
- "ragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_ord" +
- "er\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n\027QueryCo" +
- "ntextInformation\022\030\n\020query_start_time\030\001 \001",
- "(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schema_" +
- "name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n\010endpoin" +
- "t\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014queue" +
- "_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(\003\"h\n\020Fi" +
- "nishedReceiver\022*\n\010receiver\030\001 \001(\0132\030.exec." +
- "bit.FragmentHandle\022(\n\006sender\030\002 \001(\0132\030.exe" +
- "c.bit.FragmentHandle*\323\002\n\007RpcType\022\r\n\tHAND" +
- "SHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n\030REQ_INI" +
- "TIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCEL_FRAGME" +
- "NT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022\027\n\023REQ_F",
- "RAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STATUS\020\t\022\024\n\020" +
- "REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_CANCEL\020\017" +
- "\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\030\n\024RESP_FRAGM" +
- "ENT_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023" +
- "\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS" +
- "\020\016B+\n\033org.apache.drill.exec.protoB\nBitCo" +
- "ntrolH\001"
+ "ontrol.PlanFragment\".\n\rCustomMessage\022\014\n\004" +
+ "type\030\001 \001(\005\022\017\n\007message\030\002 \001(\014\"\374\003\n\014PlanFrag" +
+ "ment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragment" +
+ "Handle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost" +
+ "\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost" +
+ "\030\007 \001(\002\022\025\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fr" +
+ "agment\030\t \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec." +
+ "DrillbitEndpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec",
+ ".DrillbitEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\010" +
+ "20000000\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n" +
+ "\013credentials\030\016 \001(\0132\034.exec.shared.UserCre" +
+ "dentials\022\024\n\014options_json\030\017 \001(\t\022:\n\007contex" +
+ "t\030\020 \001(\0132).exec.bit.control.QueryContextI" +
+ "nformation\022.\n\tcollector\030\021 \003(\0132\033.exec.bit" +
+ ".control.Collector\"\210\001\n\tCollector\022\"\n\032oppo" +
+ "site_major_fragment_id\030\001 \001(\005\022#\n\027incoming" +
+ "_minor_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_ou" +
+ "t_of_order\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n",
+ "\027QueryContextInformation\022\030\n\020query_start_" +
+ "time\030\001 \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default" +
+ "_schema_name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n" +
+ "\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022" +
+ "\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001" +
+ "(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(\013" +
+ "2\030.exec.bit.FragmentHandle\022(\n\006sender\030\002 \001" +
+ "(\0132\030.exec.bit.FragmentHandle*\364\002\n\007RpcType" +
+ "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n" +
+ "\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCE",
+ "L_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022" +
+ "\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STAT" +
+ "US\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_" +
+ "CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nRE" +
+ "Q_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024" +
+ "RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STATU" +
+ "S\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUSTO" +
+ "M\020\022B+\n\033org.apache.drill.exec.protoB\nBitC" +
+ "ontrolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8550,32 +9055,38 @@ public final class BitControl {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_InitializeFragments_descriptor,
new java.lang.String[] { "Fragment", });
- internal_static_exec_bit_control_PlanFragment_descriptor =
+ internal_static_exec_bit_control_CustomMessage_descriptor =
getDescriptor().getMessageTypes().get(4);
+ internal_static_exec_bit_control_CustomMessage_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_exec_bit_control_CustomMessage_descriptor,
+ new java.lang.String[] { "Type", "Message", });
+ internal_static_exec_bit_control_PlanFragment_descriptor =
+ getDescriptor().getMessageTypes().get(5);
internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_PlanFragment_descriptor,
new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "LeafFragment", "Assignment", "Foreman", "MemInitial", "MemMax", "Credentials", "OptionsJson", "Context", "Collector", });
internal_static_exec_bit_control_Collector_descriptor =
- getDescriptor().getMessageTypes().get(5);
+ getDescriptor().getMessageTypes().get(6);
internal_static_exec_bit_control_Collector_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_Collector_descriptor,
new java.lang.String[] { "OppositeMajorFragmentId", "IncomingMinorFragment", "SupportsOutOfOrder", "IsSpooling", });
internal_static_exec_bit_control_QueryContextInformation_descriptor =
- getDescriptor().getMessageTypes().get(6);
+ getDescriptor().getMessageTypes().get(7);
internal_static_exec_bit_control_QueryContextInformation_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_QueryContextInformation_descriptor,
new java.lang.String[] { "QueryStartTime", "TimeZone", "DefaultSchemaName", });
internal_static_exec_bit_control_WorkQueueStatus_descriptor =
- getDescriptor().getMessageTypes().get(7);
+ getDescriptor().getMessageTypes().get(8);
internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_WorkQueueStatus_descriptor,
new java.lang.String[] { "Endpoint", "QueueLength", "ReportTime", });
internal_static_exec_bit_control_FinishedReceiver_descriptor =
- getDescriptor().getMessageTypes().get(8);
+ getDescriptor().getMessageTypes().get(9);
internal_static_exec_bit_control_FinishedReceiver_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_control_FinishedReceiver_descriptor,
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
index a4088c9..0a9b90d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
@@ -499,6 +499,125 @@ public final class SchemaBitControl
}
}
+ public static final class CustomMessage
+ {
+ public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema WRITE =
+ new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema();
+ public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema MERGE =
+ new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema();
+
+ public static class MessageSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.BitControl.CustomMessage>
+ {
+ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException
+ {
+ if(message.hasType())
+ output.writeInt32(1, message.getType(), false);
+ if(message.hasMessage())
+ output.writeByteArray(2, message.getMessage().toByteArray(), false);
+
+ }
+ public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage message)
+ {
+ return message.isInitialized();
+ }
+ public java.lang.String getFieldName(int number)
+ {
+ return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+ }
+ public int getFieldNumber(java.lang.String name)
+ {
+ return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+ }
+ public java.lang.Class<org.apache.drill.exec.proto.BitControl.CustomMessage> typeClass()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.class;
+ }
+ public java.lang.String messageName()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+ }
+ public java.lang.String messageFullName()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+ }
+ //unused
+ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException {}
+ public org.apache.drill.exec.proto.BitControl.CustomMessage newMessage() { return null; }
+ }
+ public static class BuilderSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.BitControl.CustomMessage.Builder>
+ {
+ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException
+ {
+ for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+ {
+ switch(number)
+ {
+ case 0:
+ return;
+ case 1:
+ builder.setType(input.readInt32());
+ break;
+ case 2:
+ builder.setMessage(com.google.protobuf.ByteString.copyFrom(input.readByteArray()));
+ break;
+ default:
+ input.handleUnknownField(number, this);
+ }
+ }
+ }
+ public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder)
+ {
+ return builder.isInitialized();
+ }
+ public org.apache.drill.exec.proto.BitControl.CustomMessage.Builder newMessage()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder();
+ }
+ public java.lang.String getFieldName(int number)
+ {
+ return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+ }
+ public int getFieldNumber(java.lang.String name)
+ {
+ return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+ }
+ public java.lang.Class<org.apache.drill.exec.proto.BitControl.CustomMessage.Builder> typeClass()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class;
+ }
+ public java.lang.String messageName()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+ }
+ public java.lang.String messageFullName()
+ {
+ return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+ }
+ //unused
+ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException {}
+ }
+ public static java.lang.String getFieldName(int number)
+ {
+ switch(number)
+ {
+ case 1: return "type";
+ case 2: return "message";
+ default: return null;
+ }
+ }
+ public static int getFieldNumber(java.lang.String name)
+ {
+ java.lang.Integer number = fieldMap.get(name);
+ return number == null ? 0 : number.intValue();
+ }
+ private static final java.util.HashMap<java.lang.String,java.lang.Integer> fieldMap = new java.util.HashMap<java.lang.String,java.lang.Integer>();
+ static
+ {
+ fieldMap.put("type", 1);
+ fieldMap.put("message", 2);
+ }
+ }
+
public static final class PlanFragment
{
public static final org.apache.drill.exec.proto.SchemaBitControl.PlanFragment.MessageSchema WRITE =
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
new file mode 100644
index 0000000..af9195e
--- /dev/null
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by http://code.google.com/p/protostuff/ ... DO NOT EDIT!
+// Generated from protobuf
+
+package org.apache.drill.exec.proto.beans;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import com.dyuproject.protostuff.ByteString;
+import com.dyuproject.protostuff.GraphIOUtil;
+import com.dyuproject.protostuff.Input;
+import com.dyuproject.protostuff.Message;
+import com.dyuproject.protostuff.Output;
+import com.dyuproject.protostuff.Schema;
+
+public final class CustomMessage implements Externalizable, Message<CustomMessage>, Schema<CustomMessage>
+{
+
+ public static Schema<CustomMessage> getSchema()
+ {
+ return DEFAULT_INSTANCE;
+ }
+
+ public static CustomMessage getDefaultInstance()
+ {
+ return DEFAULT_INSTANCE;
+ }
+
+ static final CustomMessage DEFAULT_INSTANCE = new CustomMessage();
+
+
+ private int type;
+ private ByteString message;
+
+ public CustomMessage()
+ {
+
+ }
+
+ // getters and setters
+
+ // type
+
+ public int getType()
+ {
+ return type;
+ }
+
+ public CustomMessage setType(int type)
+ {
+ this.type = type;
+ return this;
+ }
+
+ // message
+
+ public ByteString getMessage()
+ {
+ return message;
+ }
+
+ public CustomMessage setMessage(ByteString message)
+ {
+ this.message = message;
+ return this;
+ }
+
+ // java serialization
+
+ public void readExternal(ObjectInput in) throws IOException
+ {
+ GraphIOUtil.mergeDelimitedFrom(in, this, this);
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ GraphIOUtil.writeDelimitedTo(out, this, this);
+ }
+
+ // message method
+
+ public Schema<CustomMessage> cachedSchema()
+ {
+ return DEFAULT_INSTANCE;
+ }
+
+ // schema methods
+
+ public CustomMessage newMessage()
+ {
+ return new CustomMessage();
+ }
+
+ public Class<CustomMessage> typeClass()
+ {
+ return CustomMessage.class;
+ }
+
+ public String messageName()
+ {
+ return CustomMessage.class.getSimpleName();
+ }
+
+ public String messageFullName()
+ {
+ return CustomMessage.class.getName();
+ }
+
+ public boolean isInitialized(CustomMessage message)
+ {
+ return true;
+ }
+
+ public void mergeFrom(Input input, CustomMessage message) throws IOException
+ {
+ for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+ {
+ switch(number)
+ {
+ case 0:
+ return;
+ case 1:
+ message.type = input.readInt32();
+ break;
+ case 2:
+ message.message = input.readBytes();
+ break;
+ default:
+ input.handleUnknownField(number, this);
+ }
+ }
+ }
+
+
+ public void writeTo(Output output, CustomMessage message) throws IOException
+ {
+ if(message.type != 0)
+ output.writeInt32(1, message.type, false);
+
+ if(message.message != null)
+ output.writeBytes(2, message.message, false);
+ }
+
+ public String getFieldName(int number)
+ {
+ switch(number)
+ {
+ case 1: return "type";
+ case 2: return "message";
+ default: return null;
+ }
+ }
+
+ public int getFieldNumber(String name)
+ {
+ final Integer number = __fieldMap.get(name);
+ return number == null ? 0 : number.intValue();
+ }
+
+ private static final java.util.HashMap<String,Integer> __fieldMap = new java.util.HashMap<String,Integer>();
+ static
+ {
+ __fieldMap.put("type", 1);
+ __fieldMap.put("message", 2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 6d1b529..ca441f7 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -25,12 +25,14 @@ enum RpcType {
REQ_QUERY_STATUS = 10;
REQ_QUERY_CANCEL = 15;
REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+ REQ_CUSTOM = 17;
// bit responses
RESP_FRAGMENT_HANDLE = 11;
RESP_FRAGMENT_STATUS = 12;
RESP_BIT_STATUS = 13;
RESP_QUERY_STATUS = 14;
+ RESP_CUSTOM = 18;
}
message BitControlHandshake{
@@ -52,6 +54,11 @@ message InitializeFragments {
repeated PlanFragment fragment = 1;
}
+message CustomMessage {
+ optional int32 type = 1;
+ optional bytes message = 2;
+}
+
message PlanFragment {
optional FragmentHandle handle = 1;
optional float network_cost = 4;