You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 07:44:50 UTC

[2/3] drill git commit: DRILL-4238: Add a custom RPC interface on the Control channel for extensible communication between bits.

DRILL-4238: Add a custom RPC interface on the Control channel for extensible communication between bits.

This closes #313.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/67d5cc6d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/67d5cc6d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/67d5cc6d

Branch: refs/heads/master
Commit: 67d5cc6df0ca394dd26ff5c0a2b0510779bad949
Parents: 467c405
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Jan 2 14:55:21 2016 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:41:50 2016 -0800

----------------------------------------------------------------------
 .../exec/rpc/control/ControlRpcConfig.java      |   2 +
 .../drill/exec/rpc/control/ControlTunnel.java   | 174 +++++-
 .../drill/exec/rpc/control/Controller.java      |  63 ++
 .../drill/exec/rpc/control/ControllerImpl.java  |  13 +
 .../exec/rpc/control/CustomHandlerRegistry.java | 124 ++++
 .../rpc/control/DefaultInstanceHandler.java     |   4 +-
 .../exec/work/batch/ControlMessageHandler.java  |  14 +
 .../exec/rpc/control/TestCustomTunnel.java      | 138 +++++
 .../org/apache/drill/exec/proto/BitControl.java | 599 +++++++++++++++++--
 .../drill/exec/proto/SchemaBitControl.java      | 119 ++++
 .../drill/exec/proto/beans/CustomMessage.java   | 186 ++++++
 protocol/src/main/protobuf/BitControl.proto     |   7 +
 12 files changed, 1397 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 44046c9..ec09a98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Executor;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -50,6 +51,7 @@ public class ControlRpcConfig {
         .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
         .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index b90912d..ff8be1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,6 +17,13 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -26,12 +33,16 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.FutureBitCommand;
 import org.apache.drill.exec.rpc.ListeningCommand;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
 
 public class ControlTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
@@ -176,4 +187,165 @@ public class ControlTunnel {
       connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
     }
   }
+
+  public <SEND extends Message, RECEIVE extends Message> CustomTunnel<SEND, RECEIVE> getCustomTunnel(
+      int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
+    return new CustomTunnel<SEND, RECEIVE>(messageTypeId, parser);
+  }
+
+  private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> {
+
+    private CustomMessage message;
+    private ByteBuf[] dataBodies;
+
+    public CustomMessageSender(RpcOutcomeListener<CustomMessage> listener, CustomMessage message, ByteBuf[] dataBodies) {
+      super(listener);
+      this.message = message;
+      this.dataBodies = dataBodies;
+    }
+
+    @Override
+    public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+    }
+
+  }
+
+  private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> {
+
+    private CustomMessage message;
+    private ByteBuf[] dataBodies;
+
+    public SyncCustomMessageSender(CustomMessage message, ByteBuf[] dataBodies) {
+      super();
+      this.message = message;
+      this.dataBodies = dataBodies;
+    }
+
+    @Override
+    public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+    }
+  }
+
+  /**
+   * A class used to return a synchronous future when doing custom rpc messages.
+   * @param <RECEIVE>
+   *          The type of message that will be returned.
+   */
+  public class CustomFuture<RECEIVE> {
+
+    private Parser<RECEIVE> parser;
+    private DrillRpcFuture<CustomMessage> future;
+
+    public CustomFuture(Parser<RECEIVE> parser, DrillRpcFuture<CustomMessage> future) {
+      super();
+      this.parser = parser;
+      this.future = future;
+    }
+
+    public RECEIVE get() throws RpcException, InvalidProtocolBufferException {
+      CustomMessage message = future.checkedGet();
+      return parser.parseFrom(message.getMessage());
+    }
+
+    public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException,
+        InvalidProtocolBufferException {
+      CustomMessage message = future.checkedGet(timeout, unit);
+      return parser.parseFrom(message.getMessage());
+    }
+
+    public DrillBuf getBuffer() throws RpcException {
+      return (DrillBuf) future.getBuffer();
+    }
+
+  }
+
+  /**
+   * A special tunnel that can be used for custom types of messages. Its lifecycle is tied to the underlying
+   * ControlTunnel.
+   * @param <SEND>
+   *          The type of message the control tunnel will be able to send.
+   * @param <RECEIVE>
+   *          The expected response the control tunnel expects to receive.
+   */
+  public class CustomTunnel<SEND extends Message, RECEIVE extends Message> {
+    private int messageTypeId;
+    private Parser<RECEIVE> parser;
+
+    private CustomTunnel(int messageTypeId, Parser<RECEIVE> parser) {
+      super();
+      this.messageTypeId = messageTypeId;
+      this.parser = parser;
+    }
+
+    /**
+     * Send a message and receive a future for monitoring the outcome.
+     * @param messageToSend
+     *          The structured message to send.
+     * @param dataBodies
+     *          One or more optional unstructured messages to append to the structure message.
+     * @return The CustomFuture that can be used to wait for the response.
+     */
+    public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf... dataBodies) {
+      final CustomMessage customMessage = CustomMessage.newBuilder()
+          .setMessage(messageToSend.toByteString())
+          .setType(messageTypeId)
+          .build();
+      final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
+      manager.runCommand(b);
+      DrillRpcFuture<CustomMessage> innerFuture = b.getFuture();
+      return new CustomFuture<RECEIVE>(parser, innerFuture);
+    }
+
+    /**
+     * Send a message using a custom listener.
+     * @param listener
+     *          The listener to inform of the outcome of the sent message.
+     * @param messageToSend
+     *          The structured message to send.
+     * @param dataBodies
+     *          One or more optional unstructured messages to append to the structure message.
+     */
+    public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf... dataBodies) {
+      final CustomMessage customMessage = CustomMessage.newBuilder()
+          .setMessage(messageToSend.toByteString())
+          .setType(messageTypeId)
+          .build();
+      manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage, dataBodies));
+    }
+
+    private class CustomTunnelListener implements RpcOutcomeListener<CustomMessage> {
+      final RpcOutcomeListener<RECEIVE> innerListener;
+
+      public CustomTunnelListener(RpcOutcomeListener<RECEIVE> innerListener) {
+        super();
+        this.innerListener = innerListener;
+      }
+
+      @Override
+      public void failed(RpcException ex) {
+        innerListener.failed(ex);
+      }
+
+      @Override
+      public void success(CustomMessage value, ByteBuf buffer) {
+        try {
+          RECEIVE message = parser.parseFrom(value.getMessage());
+          innerListener.success(message, buffer);
+        } catch (InvalidProtocolBufferException e) {
+          innerListener.failed(new RpcException("Failure while parsing message locally.", e));
+        }
+
+      }
+
+      @Override
+      public void interrupted(InterruptedException e) {
+        innerListener.interrupted(e);
+      }
+
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index af60ff0..94df739 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -17,10 +17,17 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 import java.io.Closeable;
 
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.UserRpcException;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 
 /**
  * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a
@@ -41,5 +48,61 @@ public interface Controller extends Closeable {
 
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;
 
+  /**
+   * Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this
+   * method manages locking internally.
+   *
+   * @param messageTypeId
+   *          The type of message id to handle. This corresponds to the CustomMessage.type field. Note that only a
+   *          single handler for a particular type of message can be registered within a particular Drillbit.
+   * @param handler
+   *          The handler that should be used to handle this type of message.
+   * @param parser
+   *          The parser used to handle the types of messages the handler above handles.
+   */
+  public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser);
+
+  /**
+   * Defines how the Controller should handle custom messages. Implementations need to be threadsafe.
+   * @param <REQUEST>
+   *          The type of request message.
+   * @param <RESPONSE>
+   *          The type of the response message.
+   */
+  public interface CustomMessageHandler<REQUEST extends MessageLite, RESPONSE extends MessageLite> {
+
+    /**
+     * Handle an incoming message.
+     * @param pBody
+     *          The protobuf body message object of type REQUEST that was sent.
+     * @param dBody
+     *          An optional byte body that was sent along with the structured message.
+     * @return The response that should be sent to the message sender.
+     * @throws UserRpcException
+     *           throw this exception if there is an RPC failure that should be communicated to the sender.
+     */
+    public CustomResponse<RESPONSE> onMessage(REQUEST pBody, DrillBuf dBody) throws UserRpcException;
+  }
+
+  /**
+   * A simple interface that describes the nature of the response to the custom incoming message.
+   *
+   * @param <RESPONSE>
+   *          The type of message that the respopnse contains. Must be a protobuf message type.
+   */
+  public interface CustomResponse<RESPONSE extends MessageLite> {
+
+    /**
+     * The structured portion of the response.
+     * @return A protobuf message of type RESPONSE
+     */
+    public RESPONSE getMessage();
 
+    /**
+     * The optional unstructured portion of the message.
+     * @return null or one or more unstructured bodies.
+     */
+    public ByteBuf[] getBodies();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 631d479..0564cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 
 /**
  * Manages communication tunnels between nodes.
@@ -36,6 +38,7 @@ public class ControllerImpl implements Controller {
   private final BootStrapContext context;
   private final ConnectionManagerRegistry connectionRegistry;
   private final boolean allowPortHunting;
+  private final CustomHandlerRegistry handlerRegistry;
 
   public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) {
     super();
@@ -43,6 +46,7 @@ public class ControllerImpl implements Controller {
     this.context = context;
     this.connectionRegistry = new ConnectionManagerRegistry(handler, context);
     this.allowPortHunting = allowPortHunting;
+    this.handlerRegistry = handler.getHandlerRegistry();
   }
 
   @Override
@@ -52,6 +56,7 @@ public class ControllerImpl implements Controller {
     port = server.bind(port, allowPortHunting);
     DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
     connectionRegistry.setEndpoint(completeEndpoint);
+    handlerRegistry.setEndpoint(completeEndpoint);
     return completeEndpoint;
   }
 
@@ -60,6 +65,13 @@ public class ControllerImpl implements Controller {
     return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
   }
 
+
+  @Override
+  public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser) {
+    handlerRegistry.registerCustomHandler(messageTypeId, handler, parser);
+  }
+
   public void close() {
     Closeables.closeQuietly(server);
     for (ControlConnectionManager bt : connectionRegistry) {
@@ -67,4 +79,5 @@ public class ControllerImpl implements Controller {
     }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
new file mode 100644
index 0000000..c328cd8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
+import org.apache.drill.exec.proto.BitControl.RpcType;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserRpcException;
+import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler;
+import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class CustomHandlerRegistry {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CustomHandlerRegistry.class);
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock read = new AutoCloseableLock(readWriteLock.readLock());
+  private final AutoCloseableLock write = new AutoCloseableLock(readWriteLock.writeLock());
+  private final IntObjectOpenHashMap<ParsingHandler<?>> handlers = new IntObjectOpenHashMap<>();
+  private volatile DrillbitEndpoint endpoint;
+
+  public CustomHandlerRegistry() {
+  }
+
+  public void setEndpoint(DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+
+  public <SEND extends MessageLite> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<SEND, ?> handler,
+      Parser<SEND> parser) {
+    Preconditions.checkNotNull(handler);
+    Preconditions.checkNotNull(parser);
+    try (AutoCloseableLock lock = write.open()) {
+      ParsingHandler<?> parsingHandler = handlers.get(messageTypeId);
+      if (parsingHandler != null) {
+        throw new IllegalStateException(String.format(
+            "Only one handler can be registered for a given custom message type. You tried to register a handler for "
+                + "the %d message type but one had already been registered.",
+            messageTypeId));
+      }
+
+      parsingHandler = new ParsingHandler<SEND>(handler, parser);
+      handlers.put(messageTypeId, parsingHandler);
+    }
+  }
+
+  public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
+    final ParsingHandler<?> handler;
+    try (AutoCloseableLock lock = read.open()) {
+      handler = handlers.get(message.getType());
+    }
+
+    if (handler == null) {
+      throw new UserRpcException(
+          endpoint, "Unable to handle message.",
+          new IllegalStateException(String.format(
+              "Unable to handle message. The message type provided [%d] did not have a registered handler.",
+              message.getType())));
+    }
+    final CustomResponse<?> customResponse = handler.onMessage(message.getMessage(), dBody);
+    final CustomMessage responseMessage = CustomMessage.newBuilder()
+        .setMessage(customResponse.getMessage().toByteString())
+        .setType(message.getType())
+        .build();
+    // make sure we don't pass in a null array.
+    final ByteBuf[] dBodies = customResponse.getBodies() == null ? new DrillBuf[0] : customResponse.getBodies();
+    return new Response(RpcType.RESP_CUSTOM, responseMessage, dBodies);
+
+  }
+
+  private class ParsingHandler<SEND extends MessageLite> {
+    private final CustomMessageHandler<SEND, ?> handler;
+    private final Parser<SEND> parser;
+
+    public ParsingHandler(CustomMessageHandler<SEND, ?> handler, Parser<SEND> parser) {
+      super();
+      this.handler = handler;
+      this.parser = parser;
+    }
+
+    public CustomResponse<?> onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException {
+
+      try {
+        final SEND message = parser.parseFrom(pBody);
+        return handler.onMessage(message, dBody);
+
+      } catch (InvalidProtocolBufferException e) {
+        throw new UserRpcException(endpoint, "Failure parsing message.", e);
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
index 10fe343..7065201 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control;
 
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.BitStatus;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -46,7 +47,8 @@ public class DefaultInstanceHandler {
       return BitStatus.getDefaultInstance();
     case RpcType.RESP_QUERY_STATUS_VALUE:
       return QueryProfile.getDefaultInstance();
-
+    case RpcType.RESP_CUSTOM_VALUE:
+      return CustomMessage.getDefaultInstance();
     default:
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 1c0eb80..77c069b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -19,8 +19,10 @@ package org.apache.drill.exec.work.batch;
 
 import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -39,6 +41,7 @@ import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
 import org.apache.drill.exec.rpc.control.ControlRpcConfig;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -50,6 +53,7 @@ import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 public class ControlMessageHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
   private final WorkerBee bee;
+  private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry();
 
   public ControlMessageHandler(final WorkerBee bee) {
     this.bee = bee;
@@ -69,6 +73,11 @@ public class ControlMessageHandler {
       return ControlRpcConfig.OK;
     }
 
+    case RpcType.REQ_CUSTOM_VALUE: {
+      final CustomMessage customMessage = get(pBody, CustomMessage.PARSER);
+      return handlerRegistry.handle(customMessage, (DrillBuf) dBody);
+    }
+
     case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
       final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
       receivingFragmentFinished(finishedReceiver);
@@ -228,4 +237,9 @@ public class ControlMessageHandler {
 
     return Acks.OK;
   }
+
+  public CustomHandlerRegistry getHandlerRegistry() {
+    return handlerRegistry;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
new file mode 100644
index 0000000..2008a48
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.internal.ThreadLocalRandom;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserRpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel.CustomFuture;
+import org.apache.drill.exec.rpc.control.ControlTunnel.CustomTunnel;
+import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler;
+import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.Test;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class TestCustomTunnel extends BaseTestQuery {
+
+  private final QueryId expectedId = QueryId
+      .newBuilder()
+      .setPart1(ThreadLocalRandom.current().nextLong())
+      .setPart2(ThreadLocalRandom.current().nextLong())
+      .build();
+
+  private final ByteBuf buf1;
+  private final byte[] expected;
+
+  public TestCustomTunnel() {
+    buf1 = UnpooledByteBufAllocator.DEFAULT.buffer(1024);
+    Random r = new Random();
+    this.expected = new byte[1024];
+    r.nextBytes(expected);
+    buf1.writeBytes(expected);
+  }
+
+  @Test
+  public void ensureRoundTrip() throws RpcException, InvalidProtocolBufferException {
+
+    final DrillbitContext context = getDrillbitContext();
+    final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), false);
+    context.getController().registerCustomHandler(1001, handler, DrillbitEndpoint.PARSER);
+    final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+    final CustomTunnel<DrillbitEndpoint, QueryId> tunnel = loopbackTunnel.getCustomTunnel(1001, DrillbitEndpoint.class,
+        QueryId.PARSER);
+    CustomFuture<QueryId> future = tunnel.send(context.getEndpoint());
+    assertEquals(expectedId, future.get());
+  }
+
+  @Test
+  public void ensureRoundTripBytes() throws RpcException, InvalidProtocolBufferException {
+    final DrillbitContext context = getDrillbitContext();
+    final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), true);
+    context.getController().registerCustomHandler(1002, handler, DrillbitEndpoint.PARSER);
+    final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+    final CustomTunnel<DrillbitEndpoint, QueryId> tunnel = loopbackTunnel.getCustomTunnel(1002, DrillbitEndpoint.class,
+        QueryId.PARSER);
+    buf1.retain();
+    CustomFuture<QueryId> future = tunnel.send(context.getEndpoint(), buf1);
+    assertEquals(expectedId, future.get());
+    byte[] actual = new byte[1024];
+    future.getBuffer().getBytes(0, actual);
+    future.getBuffer().release();
+    assertTrue(Arrays.equals(expected, actual));
+  }
+
+  private class TestCustomMessageHandler implements CustomMessageHandler<DrillbitEndpoint, QueryId> {
+    private DrillbitEndpoint expectedValue;
+    private final boolean returnBytes;
+
+    public TestCustomMessageHandler(DrillbitEndpoint expectedValue, boolean returnBytes) {
+      super();
+      this.expectedValue = expectedValue;
+      this.returnBytes = returnBytes;
+    }
+
+    @Override
+    public CustomResponse<QueryId> onMessage(DrillbitEndpoint pBody, DrillBuf dBody) throws UserRpcException {
+
+      if (!expectedValue.equals(pBody)) {
+        throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException());
+      }
+
+      if (returnBytes) {
+        byte[] actual = new byte[1024];
+        dBody.getBytes(0, actual);
+        if (!Arrays.equals(expected, actual)) {
+          throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException());
+        }
+      }
+
+      return new CustomResponse<QueryId>() {
+
+        @Override
+        public QueryId getMessage() {
+          return expectedId;
+        }
+
+        @Override
+        public ByteBuf[] getBodies() {
+          if (returnBytes) {
+            buf1.retain();
+            return new ByteBuf[] { buf1 };
+          } else {
+            return null;
+          }
+        }
+
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index 493036b..b16934d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -99,25 +99,33 @@ public final class BitControl {
      */
     REQ_UNPAUSE_FRAGMENT(10, 16),
     /**
+     * <code>REQ_CUSTOM = 17;</code>
+     */
+    REQ_CUSTOM(11, 17),
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
      * bit responses
      * </pre>
      */
-    RESP_FRAGMENT_HANDLE(11, 11),
+    RESP_FRAGMENT_HANDLE(12, 11),
     /**
      * <code>RESP_FRAGMENT_STATUS = 12;</code>
      */
-    RESP_FRAGMENT_STATUS(12, 12),
+    RESP_FRAGMENT_STATUS(13, 12),
     /**
      * <code>RESP_BIT_STATUS = 13;</code>
      */
-    RESP_BIT_STATUS(13, 13),
+    RESP_BIT_STATUS(14, 13),
     /**
      * <code>RESP_QUERY_STATUS = 14;</code>
      */
-    RESP_QUERY_STATUS(14, 14),
+    RESP_QUERY_STATUS(15, 14),
+    /**
+     * <code>RESP_CUSTOM = 18;</code>
+     */
+    RESP_CUSTOM(16, 18),
     ;
 
     /**
@@ -185,6 +193,10 @@ public final class BitControl {
      */
     public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16;
     /**
+     * <code>REQ_CUSTOM = 17;</code>
+     */
+    public static final int REQ_CUSTOM_VALUE = 17;
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
@@ -204,6 +216,10 @@ public final class BitControl {
      * <code>RESP_QUERY_STATUS = 14;</code>
      */
     public static final int RESP_QUERY_STATUS_VALUE = 14;
+    /**
+     * <code>RESP_CUSTOM = 18;</code>
+     */
+    public static final int RESP_CUSTOM_VALUE = 18;
 
 
     public final int getNumber() { return value; }
@@ -221,10 +237,12 @@ public final class BitControl {
         case 10: return REQ_QUERY_STATUS;
         case 15: return REQ_QUERY_CANCEL;
         case 16: return REQ_UNPAUSE_FRAGMENT;
+        case 17: return REQ_CUSTOM;
         case 11: return RESP_FRAGMENT_HANDLE;
         case 12: return RESP_FRAGMENT_STATUS;
         case 13: return RESP_BIT_STATUS;
         case 14: return RESP_QUERY_STATUS;
+        case 18: return RESP_CUSTOM;
         default: return null;
       }
     }
@@ -3001,6 +3019,486 @@ public final class BitControl {
     // @@protoc_insertion_point(class_scope:exec.bit.control.InitializeFragments)
   }
 
+  public interface CustomMessageOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int32 type = 1;
+    /**
+     * <code>optional int32 type = 1;</code>
+     */
+    boolean hasType();
+    /**
+     * <code>optional int32 type = 1;</code>
+     */
+    int getType();
+
+    // optional bytes message = 2;
+    /**
+     * <code>optional bytes message = 2;</code>
+     */
+    boolean hasMessage();
+    /**
+     * <code>optional bytes message = 2;</code>
+     */
+    com.google.protobuf.ByteString getMessage();
+  }
+  /**
+   * Protobuf type {@code exec.bit.control.CustomMessage}
+   */
+  public static final class CustomMessage extends
+      com.google.protobuf.GeneratedMessage
+      implements CustomMessageOrBuilder {
+    // Use CustomMessage.newBuilder() to construct.
+    private CustomMessage(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private CustomMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final CustomMessage defaultInstance;
+    public static CustomMessage getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public CustomMessage getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private CustomMessage(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              type_ = input.readInt32();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              message_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<CustomMessage> PARSER =
+        new com.google.protobuf.AbstractParser<CustomMessage>() {
+      public CustomMessage parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new CustomMessage(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<CustomMessage> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int32 type = 1;
+    public static final int TYPE_FIELD_NUMBER = 1;
+    private int type_;
+    /**
+     * <code>optional int32 type = 1;</code>
+     */
+    public boolean hasType() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int32 type = 1;</code>
+     */
+    public int getType() {
+      return type_;
+    }
+
+    // optional bytes message = 2;
+    public static final int MESSAGE_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString message_;
+    /**
+     * <code>optional bytes message = 2;</code>
+     */
+    public boolean hasMessage() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional bytes message = 2;</code>
+     */
+    public com.google.protobuf.ByteString getMessage() {
+      return message_;
+    }
+
+    private void initFields() {
+      type_ = 0;
+      message_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(1, type_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, message_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(1, type_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, message_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.drill.exec.proto.BitControl.CustomMessage prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code exec.bit.control.CustomMessage}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.drill.exec.proto.BitControl.CustomMessageOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+      }
+
+      // Construct using org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        type_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        message_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage getDefaultInstanceForType() {
+        return org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance();
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage build() {
+        org.apache.drill.exec.proto.BitControl.CustomMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage buildPartial() {
+        org.apache.drill.exec.proto.BitControl.CustomMessage result = new org.apache.drill.exec.proto.BitControl.CustomMessage(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.type_ = type_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.message_ = message_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.drill.exec.proto.BitControl.CustomMessage) {
+          return mergeFrom((org.apache.drill.exec.proto.BitControl.CustomMessage)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.drill.exec.proto.BitControl.CustomMessage other) {
+        if (other == org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance()) return this;
+        if (other.hasType()) {
+          setType(other.getType());
+        }
+        if (other.hasMessage()) {
+          setMessage(other.getMessage());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.drill.exec.proto.BitControl.CustomMessage parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.drill.exec.proto.BitControl.CustomMessage) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int32 type = 1;
+      private int type_ ;
+      /**
+       * <code>optional int32 type = 1;</code>
+       */
+      public boolean hasType() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int32 type = 1;</code>
+       */
+      public int getType() {
+        return type_;
+      }
+      /**
+       * <code>optional int32 type = 1;</code>
+       */
+      public Builder setType(int value) {
+        bitField0_ |= 0x00000001;
+        type_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 type = 1;</code>
+       */
+      public Builder clearType() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        type_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes message = 2;
+      private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes message = 2;</code>
+       */
+      public boolean hasMessage() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional bytes message = 2;</code>
+       */
+      public com.google.protobuf.ByteString getMessage() {
+        return message_;
+      }
+      /**
+       * <code>optional bytes message = 2;</code>
+       */
+      public Builder setMessage(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        message_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes message = 2;</code>
+       */
+      public Builder clearMessage() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        message_ = getDefaultInstance().getMessage();
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:exec.bit.control.CustomMessage)
+    }
+
+    static {
+      defaultInstance = new CustomMessage(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:exec.bit.control.CustomMessage)
+  }
+
   public interface PlanFragmentOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -8440,6 +8938,11 @@ public final class BitControl {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_exec_bit_control_InitializeFragments_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_exec_bit_control_CustomMessage_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_exec_bit_control_CustomMessage_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_exec_bit_control_PlanFragment_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -8485,41 +8988,43 @@ public final class BitControl {
       "red.MinorFragmentProfile\022(\n\006handle\030\002 \001(\013",
       "2\030.exec.bit.FragmentHandle\"G\n\023Initialize" +
       "Fragments\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.c" +
-      "ontrol.PlanFragment\"\374\003\n\014PlanFragment\022(\n\006" +
-      "handle\030\001 \001(\0132\030.exec.bit.FragmentHandle\022\024" +
-      "\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002\022\021" +
-      "\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002\022\025" +
-      "\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fragment\030\t" +
-      " \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec.Drillbit" +
-      "Endpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec.Drillbi" +
-      "tEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\01020000000",
-      "\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n\013credent" +
-      "ials\030\016 \001(\0132\034.exec.shared.UserCredentials" +
-      "\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(\0132" +
-      ").exec.bit.control.QueryContextInformati" +
-      "on\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.control" +
-      ".Collector\"\210\001\n\tCollector\022\"\n\032opposite_maj" +
-      "or_fragment_id\030\001 \001(\005\022#\n\027incoming_minor_f" +
-      "ragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_ord" +
-      "er\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n\027QueryCo" +
-      "ntextInformation\022\030\n\020query_start_time\030\001 \001",
-      "(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schema_" +
-      "name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n\010endpoin" +
-      "t\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014queue" +
-      "_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(\003\"h\n\020Fi" +
-      "nishedReceiver\022*\n\010receiver\030\001 \001(\0132\030.exec." +
-      "bit.FragmentHandle\022(\n\006sender\030\002 \001(\0132\030.exe" +
-      "c.bit.FragmentHandle*\323\002\n\007RpcType\022\r\n\tHAND" +
-      "SHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n\030REQ_INI" +
-      "TIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCEL_FRAGME" +
-      "NT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022\027\n\023REQ_F",
-      "RAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STATUS\020\t\022\024\n\020" +
-      "REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_CANCEL\020\017" +
-      "\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\030\n\024RESP_FRAGM" +
-      "ENT_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023" +
-      "\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS" +
-      "\020\016B+\n\033org.apache.drill.exec.protoB\nBitCo" +
-      "ntrolH\001"
+      "ontrol.PlanFragment\".\n\rCustomMessage\022\014\n\004" +
+      "type\030\001 \001(\005\022\017\n\007message\030\002 \001(\014\"\374\003\n\014PlanFrag" +
+      "ment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragment" +
+      "Handle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost" +
+      "\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost" +
+      "\030\007 \001(\002\022\025\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fr" +
+      "agment\030\t \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec." +
+      "DrillbitEndpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec",
+      ".DrillbitEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\010" +
+      "20000000\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n" +
+      "\013credentials\030\016 \001(\0132\034.exec.shared.UserCre" +
+      "dentials\022\024\n\014options_json\030\017 \001(\t\022:\n\007contex" +
+      "t\030\020 \001(\0132).exec.bit.control.QueryContextI" +
+      "nformation\022.\n\tcollector\030\021 \003(\0132\033.exec.bit" +
+      ".control.Collector\"\210\001\n\tCollector\022\"\n\032oppo" +
+      "site_major_fragment_id\030\001 \001(\005\022#\n\027incoming" +
+      "_minor_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_ou" +
+      "t_of_order\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n",
+      "\027QueryContextInformation\022\030\n\020query_start_" +
+      "time\030\001 \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default" +
+      "_schema_name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n" +
+      "\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022" +
+      "\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001" +
+      "(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(\013" +
+      "2\030.exec.bit.FragmentHandle\022(\n\006sender\030\002 \001" +
+      "(\0132\030.exec.bit.FragmentHandle*\364\002\n\007RpcType" +
+      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n" +
+      "\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCE",
+      "L_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022" +
+      "\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STAT" +
+      "US\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_" +
+      "CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nRE" +
+      "Q_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024" +
+      "RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STATU" +
+      "S\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUSTO" +
+      "M\020\022B+\n\033org.apache.drill.exec.protoB\nBitC" +
+      "ontrolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8550,32 +9055,38 @@ public final class BitControl {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_InitializeFragments_descriptor,
               new java.lang.String[] { "Fragment", });
-          internal_static_exec_bit_control_PlanFragment_descriptor =
+          internal_static_exec_bit_control_CustomMessage_descriptor =
             getDescriptor().getMessageTypes().get(4);
+          internal_static_exec_bit_control_CustomMessage_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_exec_bit_control_CustomMessage_descriptor,
+              new java.lang.String[] { "Type", "Message", });
+          internal_static_exec_bit_control_PlanFragment_descriptor =
+            getDescriptor().getMessageTypes().get(5);
           internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_PlanFragment_descriptor,
               new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "LeafFragment", "Assignment", "Foreman", "MemInitial", "MemMax", "Credentials", "OptionsJson", "Context", "Collector", });
           internal_static_exec_bit_control_Collector_descriptor =
-            getDescriptor().getMessageTypes().get(5);
+            getDescriptor().getMessageTypes().get(6);
           internal_static_exec_bit_control_Collector_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_Collector_descriptor,
               new java.lang.String[] { "OppositeMajorFragmentId", "IncomingMinorFragment", "SupportsOutOfOrder", "IsSpooling", });
           internal_static_exec_bit_control_QueryContextInformation_descriptor =
-            getDescriptor().getMessageTypes().get(6);
+            getDescriptor().getMessageTypes().get(7);
           internal_static_exec_bit_control_QueryContextInformation_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_QueryContextInformation_descriptor,
               new java.lang.String[] { "QueryStartTime", "TimeZone", "DefaultSchemaName", });
           internal_static_exec_bit_control_WorkQueueStatus_descriptor =
-            getDescriptor().getMessageTypes().get(7);
+            getDescriptor().getMessageTypes().get(8);
           internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_WorkQueueStatus_descriptor,
               new java.lang.String[] { "Endpoint", "QueueLength", "ReportTime", });
           internal_static_exec_bit_control_FinishedReceiver_descriptor =
-            getDescriptor().getMessageTypes().get(8);
+            getDescriptor().getMessageTypes().get(9);
           internal_static_exec_bit_control_FinishedReceiver_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_FinishedReceiver_descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
index a4088c9..0a9b90d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
@@ -499,6 +499,125 @@ public final class SchemaBitControl
         }
     }
 
+    public static final class CustomMessage
+    {
+        public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema WRITE =
+            new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema();
+        public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema MERGE =
+            new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema();
+        
+        public static class MessageSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.BitControl.CustomMessage>
+        {
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException
+            {
+                if(message.hasType())
+                    output.writeInt32(1, message.getType(), false);
+                if(message.hasMessage())
+                    output.writeByteArray(2, message.getMessage().toByteArray(), false);
+
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage message)
+            {
+                return message.isInitialized();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+            }
+            public java.lang.Class<org.apache.drill.exec.proto.BitControl.CustomMessage> typeClass()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+            }
+            //unused
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException {}
+            public org.apache.drill.exec.proto.BitControl.CustomMessage newMessage() { return null; }
+        }
+        public static class BuilderSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.BitControl.CustomMessage.Builder>
+        {
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException
+            {
+                for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+                {
+                    switch(number)
+                    {
+                        case 0:
+                            return;
+                        case 1:
+                            builder.setType(input.readInt32());
+                            break;
+                        case 2:
+                            builder.setMessage(com.google.protobuf.ByteString.copyFrom(input.readByteArray()));
+                            break;
+                        default:
+                            input.handleUnknownField(number, this);
+                    }
+                }
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder)
+            {
+                return builder.isInitialized();
+            }
+            public org.apache.drill.exec.proto.BitControl.CustomMessage.Builder newMessage()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+            }
+            public java.lang.Class<org.apache.drill.exec.proto.BitControl.CustomMessage.Builder> typeClass()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+            }
+            //unused
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException {}
+        }
+        public static java.lang.String getFieldName(int number)
+        {
+            switch(number)
+            {
+                case 1: return "type";
+                case 2: return "message";
+                default: return null;
+            }
+        }
+        public static int getFieldNumber(java.lang.String name)
+        {
+            java.lang.Integer number = fieldMap.get(name);
+            return number == null ? 0 : number.intValue();
+        }
+        private static final java.util.HashMap<java.lang.String,java.lang.Integer> fieldMap = new java.util.HashMap<java.lang.String,java.lang.Integer>();
+        static
+        {
+            fieldMap.put("type", 1);
+            fieldMap.put("message", 2);
+        }
+    }
+
     public static final class PlanFragment
     {
         public static final org.apache.drill.exec.proto.SchemaBitControl.PlanFragment.MessageSchema WRITE =

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
new file mode 100644
index 0000000..af9195e
--- /dev/null
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by http://code.google.com/p/protostuff/ ... DO NOT EDIT!
+// Generated from protobuf
+
+package org.apache.drill.exec.proto.beans;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import com.dyuproject.protostuff.ByteString;
+import com.dyuproject.protostuff.GraphIOUtil;
+import com.dyuproject.protostuff.Input;
+import com.dyuproject.protostuff.Message;
+import com.dyuproject.protostuff.Output;
+import com.dyuproject.protostuff.Schema;
+
+public final class CustomMessage implements Externalizable, Message<CustomMessage>, Schema<CustomMessage>
+{
+
+    public static Schema<CustomMessage> getSchema()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    public static CustomMessage getDefaultInstance()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    static final CustomMessage DEFAULT_INSTANCE = new CustomMessage();
+
+    
+    private int type;
+    private ByteString message;
+
+    public CustomMessage()
+    {
+        
+    }
+
+    // getters and setters
+
+    // type
+
+    public int getType()
+    {
+        return type;
+    }
+
+    public CustomMessage setType(int type)
+    {
+        this.type = type;
+        return this;
+    }
+
+    // message
+
+    public ByteString getMessage()
+    {
+        return message;
+    }
+
+    public CustomMessage setMessage(ByteString message)
+    {
+        this.message = message;
+        return this;
+    }
+
+    // java serialization
+
+    public void readExternal(ObjectInput in) throws IOException
+    {
+        GraphIOUtil.mergeDelimitedFrom(in, this, this);
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException
+    {
+        GraphIOUtil.writeDelimitedTo(out, this, this);
+    }
+
+    // message method
+
+    public Schema<CustomMessage> cachedSchema()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    // schema methods
+
+    public CustomMessage newMessage()
+    {
+        return new CustomMessage();
+    }
+
+    public Class<CustomMessage> typeClass()
+    {
+        return CustomMessage.class;
+    }
+
+    public String messageName()
+    {
+        return CustomMessage.class.getSimpleName();
+    }
+
+    public String messageFullName()
+    {
+        return CustomMessage.class.getName();
+    }
+
+    public boolean isInitialized(CustomMessage message)
+    {
+        return true;
+    }
+
+    public void mergeFrom(Input input, CustomMessage message) throws IOException
+    {
+        for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+        {
+            switch(number)
+            {
+                case 0:
+                    return;
+                case 1:
+                    message.type = input.readInt32();
+                    break;
+                case 2:
+                    message.message = input.readBytes();
+                    break;
+                default:
+                    input.handleUnknownField(number, this);
+            }   
+        }
+    }
+
+
+    public void writeTo(Output output, CustomMessage message) throws IOException
+    {
+        if(message.type != 0)
+            output.writeInt32(1, message.type, false);
+
+        if(message.message != null)
+            output.writeBytes(2, message.message, false);
+    }
+
+    public String getFieldName(int number)
+    {
+        switch(number)
+        {
+            case 1: return "type";
+            case 2: return "message";
+            default: return null;
+        }
+    }
+
+    public int getFieldNumber(String name)
+    {
+        final Integer number = __fieldMap.get(name);
+        return number == null ? 0 : number.intValue();
+    }
+
+    private static final java.util.HashMap<String,Integer> __fieldMap = new java.util.HashMap<String,Integer>();
+    static
+    {
+        __fieldMap.put("type", 1);
+        __fieldMap.put("message", 2);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 6d1b529..ca441f7 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -25,12 +25,14 @@ enum RpcType {
   REQ_QUERY_STATUS = 10;
   REQ_QUERY_CANCEL = 15;
   REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+  REQ_CUSTOM = 17;
 
   // bit responses
   RESP_FRAGMENT_HANDLE = 11;
   RESP_FRAGMENT_STATUS = 12;
   RESP_BIT_STATUS = 13;
   RESP_QUERY_STATUS = 14;
+  RESP_CUSTOM = 18;
 }
 
 message BitControlHandshake{
@@ -52,6 +54,11 @@ message InitializeFragments {
   repeated PlanFragment fragment = 1;
 }
 
+message CustomMessage {
+  optional int32 type = 1;
+  optional bytes message = 2;
+}
+
 message PlanFragment {
   optional FragmentHandle handle = 1;
   optional float network_cost = 4;