You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/08/08 20:49:52 UTC

[4/4] drill git commit: DRILL-4729: Add support for prepared statement implementation on server side

DRILL-4729: Add support for prepared statement implementation on server side

+ Add following APIs for Drill Java client
  - DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query)
  - void executePreparedStatement(final PreparedStatement preparedStatement, UserResultsListener resultsListener)
  - List<QueryDataBatch> executePreparedStatement(final PreparedStatement preparedStatement) (for testing purpose)

+ Separated out the interface from UserClientConnection. It makes it easy to have wrappers which need to
  tap the messages and data going to the actual client.

+ Implement CREATE_PREPARED_STATEMENT and handle RunQuery with PreparedStatement

+ Test changes to support prepared statement as query type

+ Add tests in TestPreparedStatementProvider

this closes #530


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/14f6ec7d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/14f6ec7d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/14f6ec7d

Branch: refs/heads/master
Commit: 14f6ec7dd9b010de6c884431e443eb788ce54339
Parents: ef6e522
Author: vkorukanti <ve...@dremio.com>
Authored: Mon Jun 13 11:20:25 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Mon Aug 8 13:47:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |    41 +
 .../org/apache/drill/exec/ExecConstants.java    |     7 +
 .../apache/drill/exec/client/DrillClient.java   |    77 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |     3 +
 .../drill/exec/rpc/user/UserRpcConfig.java      |     4 +
 .../apache/drill/exec/rpc/user/UserServer.java  |    99 +-
 .../server/options/SystemOptionManager.java     |     3 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    46 +-
 .../work/prepare/PreparedStatementProvider.java |   400 +
 .../apache/drill/exec/work/user/UserWorker.java |     7 +
 .../java/org/apache/drill/BaseTestQuery.java    |    18 +-
 .../java/org/apache/drill/DrillTestWrapper.java |     8 +-
 .../test/java/org/apache/drill/TestBuilder.java |    37 +-
 .../prepare/TestPreparedStatementProvider.java  |   222 +
 .../org/apache/drill/exec/proto/ExecProtos.java |   502 +-
 .../drill/exec/proto/SchemaExecProtos.java      |   111 +
 .../drill/exec/proto/SchemaUserProtos.java      |   988 +-
 .../apache/drill/exec/proto/UserBitShared.java  |    67 +-
 .../org/apache/drill/exec/proto/UserProtos.java | 25966 +++++++++++------
 .../exec/proto/beans/ColumnSearchability.java   |    55 +
 .../exec/proto/beans/ColumnUpdatability.java    |    51 +
 .../proto/beans/CreatePreparedStatementReq.java |   163 +
 .../beans/CreatePreparedStatementResp.java      |   211 +
 .../drill/exec/proto/beans/GetCatalogsResp.java |     2 +-
 .../drill/exec/proto/beans/GetColumnsResp.java  |     2 +-
 .../drill/exec/proto/beans/GetSchemasResp.java  |     2 +-
 .../drill/exec/proto/beans/GetTablesResp.java   |     2 +-
 .../exec/proto/beans/PreparedStatement.java     |   199 +
 .../proto/beans/PreparedStatementHandle.java    |   164 +
 .../drill/exec/proto/beans/QueryType.java       |     4 +-
 .../drill/exec/proto/beans/RequestStatus.java   |     8 +-
 .../exec/proto/beans/ResultColumnMetadata.java  |   559 +
 .../apache/drill/exec/proto/beans/RpcType.java  |     4 +
 .../apache/drill/exec/proto/beans/RunQuery.java |    24 +
 .../beans/ServerPreparedStatementState.java     |   163 +
 .../src/main/protobuf/ExecutionProtos.proto     |     9 +
 protocol/src/main/protobuf/User.proto           |   208 +-
 protocol/src/main/protobuf/UserBitShared.proto  |     3 +
 38 files changed, 20685 insertions(+), 9754 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 74b313e..504b876 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -591,4 +591,45 @@ public class Types {
     return type != null ? "MajorType[" + TextFormat.shortDebugString(type) + "]" : "null";
   }
 
+  /**
+   * Get the <code>precision</code> of given type.
+   * @param majorType
+   * @return
+   */
+  public static int getPrecision(MajorType majorType) {
+    MinorType type = majorType.getMinorType();
+
+    if (type == MinorType.VARBINARY || type == MinorType.VARCHAR) {
+      return 65536;
+    }
+
+    if (majorType.hasPrecision()) {
+      return majorType.getPrecision();
+    }
+
+    return 0;
+  }
+
+  /**
+   * Get the <code>scale</code> of given type.
+   * @param majorType
+   * @return
+   */
+  public static int getScale(MajorType majorType) {
+    if (majorType.hasScale()) {
+      return majorType.getScale();
+    }
+
+    return 0;
+  }
+
+  /**
+   * Is the given type column be used in ORDER BY clause?
+   * @param type
+   * @return
+   */
+  public static boolean isSortable(MinorType type) {
+    // Currently only map and list columns are not sortable.
+    return type != MinorType.MAP && type != MinorType.LIST;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 64931a2..d6a210a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -330,4 +330,11 @@ public interface ExecConstants {
   String CODE_GEN_EXP_IN_METHOD_SIZE = "exec.java.compiler.exp_in_method_size";
   LongValidator CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR = new LongValidator(CODE_GEN_EXP_IN_METHOD_SIZE, 50);
 
+  /**
+   * Timeout for create prepare statement request. If the request exceeds this timeout, then request is timed out.
+   * Default value is 10mins.
+   */
+  String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms";
+  OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR =
+      new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 8063778..e81a4fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -53,6 +53,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
 import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
@@ -63,9 +65,11 @@ import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
 import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
 import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
 import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -83,6 +87,7 @@ import org.apache.drill.exec.rpc.user.UserResultsListener;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -325,13 +330,22 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   /**
-   * Submits a Logical plan for direct execution (bypasses parsing)
+   * Submits a string based query plan for execution and returns the result batches. Supported query types are:
+   * <p><ul>
+   *  <li>{@link QueryType#LOGICAL}
+   *  <li>{@link QueryType#PHYSICAL}
+   *  <li>{@link QueryType#SQL}
+   * </ul>
    *
-   * @param plan the plan to execute
+   * @param type Query type
+   * @param plan Query to execute
    * @return a handle for the query result
    * @throws RpcException
    */
   public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
+    checkArgument(type == QueryType.LOGICAL || type == QueryType.PHYSICAL || type == QueryType.SQL,
+        String.format("Only query types %s, %s and %s are supported in this API",
+            QueryType.LOGICAL, QueryType.PHYSICAL, QueryType.SQL));
     final UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
     final ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
     client.submitQuery(listener, query);
@@ -352,7 +366,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   /**
-   * Run query based on list of fragments that were supposedly produced during query planning phase
+   * Run query based on list of fragments that were supposedly produced during query planning phase. Supported
+   * query type is {@link QueryType#EXECUTION}
    * @param type
    * @param planFragments
    * @param resultsListener
@@ -512,6 +527,62 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   /**
+   * Create a prepared statement for given <code>query</code>.
+   *
+   * @param query
+   * @return
+   */
+  public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) {
+    final CreatePreparedStatementReq req =
+        CreatePreparedStatementReq.newBuilder()
+            .setSqlQuery(query)
+            .build();
+
+    return client.send(RpcType.CREATE_PREPARED_STATEMENT, req, CreatePreparedStatementResp.class);
+  }
+
+  /**
+   * Execute the given prepared statement.
+   *
+   * @param preparedStatementHandle Prepared statement handle returned in response to
+   *                                {@link #createPreparedStatement(String)}.
+   * @param resultsListener {@link UserResultsListener} instance for listening for query results.
+   */
+  public void executePreparedStatement(final PreparedStatementHandle preparedStatementHandle,
+      final UserResultsListener resultsListener) {
+    final RunQuery runQuery = newBuilder()
+        .setResultsMode(STREAM_FULL)
+        .setType(QueryType.PREPARED_STATEMENT)
+        .setPreparedStatementHandle(preparedStatementHandle)
+        .build();
+    client.submitQuery(resultsListener, runQuery);
+  }
+
+  /**
+   * Execute the given prepared statement and return the results.
+   *
+   * @param preparedStatementHandle Prepared statement handle returned in response to
+   *                                {@link #createPreparedStatement(String)}.
+   * @return List of {@link QueryDataBatch}s. It is responsibility of the caller to release query data batches.
+   * @throws RpcException
+   */
+  @VisibleForTesting
+  public List<QueryDataBatch> executePreparedStatement(final PreparedStatementHandle preparedStatementHandle)
+      throws RpcException {
+    final RunQuery runQuery = newBuilder()
+        .setResultsMode(STREAM_FULL)
+        .setType(QueryType.PREPARED_STATEMENT)
+        .setPreparedStatementHandle(preparedStatementHandle)
+        .build();
+
+    final ListHoldingResultsListener resultsListener = new ListHoldingResultsListener(runQuery);
+
+    client.submitQuery(resultsListener, runQuery);
+
+    return resultsListener.getResults();
+  }
+
+  /**
    * Submits a Logical plan for direct execution (bypasses parsing)
    *
    * @param  plan  the plan to execute

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5106787..c89ed0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
 import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -117,6 +118,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
       return GetTablesResp.getDefaultInstance();
     case RpcType.COLUMNS_VALUE:
       return GetColumnsResp.getDefaultInstance();
+    case RpcType.PREPARED_STATEMENT_VALUE:
+      return CreatePreparedStatementResp.getDefaultInstance();
     }
     throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 809ac65..c520744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
 import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
@@ -61,6 +63,8 @@ public class UserRpcConfig {
         .add(RpcType.GET_SCHEMAS, GetSchemasReq.class, RpcType.SCHEMAS, GetSchemasResp.class) // user to bit
         .add(RpcType.GET_TABLES, GetTablesReq.class, RpcType.TABLES, GetTablesResp.class) // user to bit
         .add(RpcType.GET_COLUMNS, GetColumnsReq.class, RpcType.COLUMNS, GetColumnsResp.class) // user to bit
+        .add(RpcType.CREATE_PREPARED_STATEMENT, CreatePreparedStatementReq.class,
+            RpcType.PREPARED_STATEMENT, CreatePreparedStatementResp.class) // user to bit
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index adf7ec4..9a15d96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
@@ -38,6 +41,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
 import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -58,6 +62,7 @@ import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnectionImpl;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
@@ -66,7 +71,7 @@ import org.apache.drill.exec.work.user.UserWorker;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
-public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
+public class UserServer extends BasicServer<RpcType, UserClientConnectionImpl> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
 
   final UserWorker worker;
@@ -106,7 +111,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
 
   @Override
-  protected void handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+  protected void handle(UserClientConnectionImpl connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
       ResponseSender responseSender) throws RpcException {
     switch (rpcType) {
 
@@ -180,16 +185,69 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       } catch (final InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding GetColumnsReq body.", e);
       }
+    case RpcType.CREATE_PREPARED_STATEMENT_VALUE:
+      try {
+        final CreatePreparedStatementReq req =
+            CreatePreparedStatementReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        worker.submitPreparedStatementWork(connection, req, responseSender);
+        break;
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding CreatePreparedStatementReq body.", e);
+      }
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
     }
   }
 
-  public class UserClientConnection extends RemoteConnection {
+  /**
+   * Interface for getting user session properties and interacting with user connection. Separating this interface from
+   * {@link RemoteConnection} implementation for user connection:
+   * <p><ul>
+   *   <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
+   *   <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
+   *        going to the actual client.
+   * </ul>
+   */
+  public interface UserClientConnection {
+    /**
+     * @return User session object.
+     */
+    UserSession getSession();
+
+    /**
+     * Send query result outcome to client. Outcome is returned through <code>listener</code>
+     * @param listener
+     * @param result
+     */
+    void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
+
+    /**
+     * Send query data to client. Outcome is returned through <code>listener</code>
+     * @param listener
+     * @param result
+     */
+    void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
+
+    /**
+     * Returns the {@link ChannelFuture} which will be notified when this
+     * channel is closed.  This method always returns the same future instance.
+     */
+    ChannelFuture getChannelClosureFuture();
+
+    /**
+     * @return Return the client node address.
+     */
+    SocketAddress getRemoteAddress();
+  }
+
+  /**
+   * {@link RemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}.
+   */
+  public class UserClientConnectionImpl extends RemoteConnection implements UserClientConnection {
 
     private UserSession session;
 
-    public UserClientConnection(SocketChannel channel) {
+    public UserClientConnectionImpl(SocketChannel channel) {
       super(channel, "user client");
     }
 
@@ -197,7 +255,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
     }
 
-    void setUser(UserToBitHandshake inbound) throws IOException {
+    void setUser(final UserToBitHandshake inbound) throws IOException {
       session = UserSession.Builder.newBuilder()
           .withCredentials(inbound.getCredentials())
           .withOptionManager(worker.getSystemOptions())
@@ -210,38 +268,47 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       }
     }
 
+    @Override
     public UserSession getSession(){
       return session;
     }
 
-    public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result, boolean allowInEventThread){
+    @Override
+    public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) {
       logger.trace("Sending result to client with {}", result);
-      send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, allowInEventThread);
-    }
-
-    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
-      sendData(listener, result, false);
+      send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, true);
     }
 
-    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+    @Override
+    public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) {
       logger.trace("Sending data to client with {}", result);
-      send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+      send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers());
     }
+
     @Override
     public BufferAllocator getAllocator() {
       return alloc;
     }
 
+    @Override
+    public ChannelFuture getChannelClosureFuture() {
+      return getChannel().closeFuture();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+      return getChannel().remoteAddress();
+    }
   }
 
   @Override
-  public UserClientConnection initRemoteConnection(SocketChannel channel) {
+  public UserClientConnectionImpl initRemoteConnection(SocketChannel channel) {
     super.initRemoteConnection(channel);
-    return new UserClientConnection(channel);
+    return new UserClientConnectionImpl(channel);
   }
 
   @Override
-  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final UserClientConnection connection) {
+  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final UserClientConnectionImpl connection) {
 
     return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
 

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3053c85..71d9f0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -145,7 +145,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR,
       ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR,
       ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR,
-      ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR
+      ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR,
+      ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR
     };
     final Map<String, OptionValidator> tmp = new HashMap<>();
     for (final OptionValidator validator : validators) {

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 2829ac1..808ba07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -61,10 +61,12 @@ import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
@@ -92,6 +94,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -162,7 +165,7 @@ public class Foreman implements Runnable {
     this.drillbitContext = drillbitContext;
 
     initiatingClient = connection;
-    this.closeFuture = initiatingClient.getChannel().closeFuture();
+    closeFuture = initiatingClient.getChannelClosureFuture();
     closeFuture.addListener(closeListener);
 
     queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
@@ -254,11 +257,18 @@ public class Foreman implements Runnable {
         parseAndRunPhysicalPlan(queryRequest.getPlan());
         break;
       case SQL:
-        runSQL(queryRequest.getPlan());
+        final String sql = queryRequest.getPlan();
+        // log query id and query text before starting any real work. Also, put
+        // them together such that it is easy to search based on query id
+        logger.info("Query text for query id {}: {}", this.queryIdString, sql);
+        runSQL(sql);
         break;
       case EXECUTION:
         runFragment(queryRequest.getFragmentsList());
         break;
+      case PREPARED_STATEMENT:
+        runPreparedStatement(queryRequest.getPreparedStatementHandle());
+        break;
       default:
         throw new IllegalStateException();
       }
@@ -484,7 +494,31 @@ public class Foreman implements Runnable {
     logger.debug("Fragments running.");
   }
 
+  /**
+   * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
+   * object of the <code>preparedStatement</code> and submits as a new query.
+   *
+   * @param preparedStatementHandle
+   * @throws ExecutionSetupException
+   */
+  private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
+      throws ExecutionSetupException {
+    final ServerPreparedStatementState serverState;
+
+    try {
+      serverState =
+          ServerPreparedStatementState.PARSER.parseFrom(preparedStatementHandle.getServerInfo());
+    } catch (final InvalidProtocolBufferException ex) {
+      throw UserException.parseError(ex)
+          .message("Failed to parse the prepared statement handle. " +
+              "Make sure the handle is same as one returned from create prepared statement call.")
+          .build(logger);
+    }
 
+    final String sql = serverState.getSqlQuery();
+    logger.info("Prepared statement query for QueryId {} : {}", queryId, sql);
+    runSQL(sql);
+  }
 
   private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
     if (plan.getProperties().resultMode != ResultMode.EXEC) {
@@ -734,7 +768,7 @@ public class Foreman implements Runnable {
             new Date(System.currentTimeMillis()),
             state,
             queryContext.getSession().getCredentials().getUserName(),
-            initiatingClient.getChannel().remoteAddress());
+            initiatingClient.getRemoteAddress());
         queryLogger.info(MAPPER.writeValueAsString(q));
       } catch (Exception e) {
         logger.error("Failure while recording query information to query log.", e);
@@ -805,7 +839,7 @@ public class Foreman implements Runnable {
        */
       try {
         // send whatever result we ended up with
-        initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
+        initiatingClient.sendResult(responseListener, resultBuilder.build());
       } catch(final Exception e) {
         addException(e);
         logger.warn("Exception sending result to client", resultException);
@@ -970,10 +1004,6 @@ public class Foreman implements Runnable {
   }
 
   private void runSQL(final String sql) throws ExecutionSetupException {
-    // log query id and query text before starting any real work. Also, put
-    // them together such that it is easy to search based on query id
-    logger.info("Query text for query id {}: {}", this.queryIdString, sql);
-
     final Pointer<String> textPlan = new Pointer<>();
     final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
     queryManager.setPlanText(textPlan.value);

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
new file mode 100644
index 0000000..982d8a3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.prepare;
+
+import static org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
+
+import org.apache.drill.common.exceptions.ErrorHelper;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
+import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.work.user.UserWorker;
+import org.joda.time.Period;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import java.math.BigDecimal;
+import java.net.SocketAddress;
+import java.sql.Date;
+import java.sql.ResultSetMetaData;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Contains worker {@link Runnable} for creating a prepared statement and helper methods.
+ */
+public class PreparedStatementProvider {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
+
+  /**
+   * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to be returned through
+   * {@link ResultSetMetaData#getColumnClassName(int)}.
+   */
+  private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType, String>builder()
+      .put(MinorType.INT, Integer.class.getName())
+      .put(MinorType.BIGINT, Long.class.getName())
+      .put(MinorType.FLOAT4, Float.class.getName())
+      .put(MinorType.FLOAT8, Double.class.getName())
+      .put(MinorType.VARCHAR, String.class.getName())
+      .put(MinorType.BIT, Boolean.class.getName())
+      .put(MinorType.DATE, Date.class.getName())
+      .put(MinorType.DECIMAL9, BigDecimal.class.getName())
+      .put(MinorType.DECIMAL18, BigDecimal.class.getName())
+      .put(MinorType.DECIMAL28SPARSE, BigDecimal.class.getName())
+      .put(MinorType.DECIMAL38SPARSE, BigDecimal.class.getName())
+      .put(MinorType.TIME, Time.class.getName())
+      .put(MinorType.TIMESTAMP, Timestamp.class.getName())
+      .put(MinorType.VARBINARY, byte[].class.getName())
+      .put(MinorType.INTERVALYEAR, Period.class.getName())
+      .put(MinorType.INTERVALDAY, Period.class.getName())
+      .put(MinorType.MAP, Object.class.getName())
+      .put(MinorType.LIST, Object.class.getName())
+      .put(MinorType.UNION, Object.class.getName())
+      .build();
+
+  /**
+   * Runnable that creates a prepared statement for given {@link CreatePreparedStatementReq} and
+   * sends the response at the end.
+   */
+  public static class PreparedStatementWorker implements Runnable {
+    private final UserClientConnection connection;
+    private final UserWorker userWorker;
+    private final ResponseSender responseSender;
+    private final CreatePreparedStatementReq req;
+
+    public PreparedStatementWorker(final UserClientConnection connection, final UserWorker userWorker,
+        final ResponseSender responseSender, final CreatePreparedStatementReq req) {
+      this.connection = connection;
+      this.userWorker = userWorker;
+      this.responseSender = responseSender;
+      this.req = req;
+    }
+
+    @Override
+    public void run() {
+      final CreatePreparedStatementResp.Builder respBuilder = CreatePreparedStatementResp.newBuilder();
+      try {
+        UserClientConnectionWrapper wrapper = new UserClientConnectionWrapper(connection);
+
+        final RunQuery limit0Query =
+            RunQuery.newBuilder()
+                .setType(QueryType.SQL)
+                .setPlan(String.format("SELECT * FROM (%s) LIMIT 0", req.getSqlQuery()))
+                .build();
+
+        final QueryId limit0QueryId = userWorker.submitWork(wrapper, limit0Query);
+
+        final long timeoutMillis =
+            userWorker.getSystemOptions().getOption(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS).num_val;
+
+        try {
+          if (!wrapper.await(timeoutMillis)) {
+            logger.error("LIMIT 0 query (QueryId: {}) for prepared statement took longer than {} ms. Cancelling.",
+                limit0QueryId, timeoutMillis);
+            userWorker.cancelQuery(limit0QueryId);
+            final String errorMsg = String.format(
+                "LIMIT 0 query (QueryId: %s) for prepared statement took longer than %d ms. " +
+                    "Query cancellation requested.\n" +
+                    "Retry after changing the option '%s' to a higher value.",
+                limit0QueryId, timeoutMillis, CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS);
+            setErrorHelper(respBuilder, TIMEOUT, null, errorMsg, ErrorType.SYSTEM);
+            return;
+          }
+        } catch (InterruptedException ex) {
+          setErrorHelper(respBuilder, FAILED, ex, "Prepared statement creation interrupted.", ErrorType.SYSTEM);
+          return;
+        }
+
+        if (wrapper.getError() != null) {
+          setErrorHelper(respBuilder, wrapper.getError(), "Failed to get result set schema for prepare statement.");
+          return;
+        }
+
+        final PreparedStatement.Builder prepStmtBuilder = PreparedStatement.newBuilder();
+
+        for (SerializedField field : wrapper.getFields()) {
+          prepStmtBuilder.addColumns(serializeColumn(field));
+        }
+
+        prepStmtBuilder.setServerHandle(
+            PreparedStatementHandle.newBuilder()
+                .setServerInfo(
+                    ServerPreparedStatementState.newBuilder()
+                        .setSqlQuery(req.getSqlQuery())
+                        .build().toByteString()
+                )
+        );
+
+        respBuilder.setStatus(OK);
+        respBuilder.setPreparedStatement(prepStmtBuilder.build());
+      } catch (Throwable e) {
+        setErrorHelper(respBuilder, FAILED, e, "Failed to create prepared statement.", ErrorType.SYSTEM);
+      } finally {
+        responseSender.send(new Response(RpcType.PREPARED_STATEMENT, respBuilder.build()));
+      }
+    }
+  }
+
+  /**
+   * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code>
+   */
+  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
+      final Throwable ex, final String message, final ErrorType errorType) {
+    respBuilder.setStatus(status);
+    final String errorId = UUID.randomUUID().toString();
+    if (ex != null) {
+      logger.error("{} ErrorId: {}", message, errorId, ex);
+    } else {
+      logger.error("{} ErrorId: {}", message, errorId);
+    }
+
+    final DrillPBError.Builder builder = DrillPBError.newBuilder();
+    builder.setErrorType(errorType);
+    builder.setErrorId(errorId);
+    builder.setMessage(message);
+
+    if (ex != null) {
+      builder.setException(ErrorHelper.getWrapper(ex));
+    }
+
+    respBuilder.setError(builder.build());
+  }
+
+  /**
+   * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code>
+   */
+  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
+      final String message) {
+    respBuilder.setStatus(FAILED);
+    final String errorId = UUID.randomUUID().toString();
+    logger.error("{} ErrorId: {}", message, errorId);
+
+    respBuilder.setError(error);
+  }
+
+  /**
+   * Decorator around {@link UserClientConnection} to tap the query results for LIMIT 0 query.
+   */
+  private static class UserClientConnectionWrapper implements UserClientConnection {
+    private final UserClientConnection inner;
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    private volatile DrillPBError error;
+    private volatile List<SerializedField> fields;
+
+    UserClientConnectionWrapper(UserClientConnection inner) {
+      this.inner = inner;
+    }
+
+    @Override
+    public UserSession getSession() {
+      return inner.getSession();
+    }
+
+    @Override
+    public ChannelFuture getChannelClosureFuture() {
+      return inner.getChannelClosureFuture();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+      return inner.getRemoteAddress();
+    }
+
+    @Override
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result) {
+      // Release the wait latch if the query is terminated.
+      final QueryState state = result.getQueryState();
+      if (state == QueryState.FAILED || state  == QueryState.CANCELED || state == QueryState.COMPLETED) {
+        if (state == QueryState.FAILED) {
+          error = result.getError(0);
+        }
+        latch.countDown();
+      }
+
+      listener.success(Acks.OK, null);
+    }
+
+    @Override
+    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
+      // Save the query results schema and release the buffers.
+      if (fields == null) {
+        fields = result.getHeader().getDef().getFieldList();
+      }
+
+      for(ByteBuf buf : result.getBuffers()) {
+        buf.release();
+      }
+
+      listener.success(Acks.OK, null);
+    }
+
+    /**
+     * Wait until the query has completed.
+     * @throws InterruptedException
+     */
+    boolean await(final long timeoutMillis) throws InterruptedException {
+      return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @return Any error returned in query execution.
+     */
+    DrillPBError getError() {
+      return error;
+    }
+
+    /**
+     * @return Schema returned in query result batch.
+     */
+    List<SerializedField> getFields() {
+      return fields;
+    }
+  }
+
+  /**
+   * Serialize the given {@link SerializedField} into a {@link ResultColumnMetadata}.
+   * @param field
+   * @return
+   */
+  private static ResultColumnMetadata serializeColumn(SerializedField field) {
+    final ResultColumnMetadata.Builder builder = ResultColumnMetadata.newBuilder();
+    final MajorType majorType = field.getMajorType();
+    final MinorType minorType = majorType.getMinorType();
+
+    /**
+     * Defaults to "DRILL" as drill has as only one catalog.
+     */
+    builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
+
+    /**
+     * Designated column's schema name. Empty string if not applicable. Initial implementation defaults to empty string
+     * as we use LIMIT 0 queries to get the schema and schema info is lost. If we derive the schema from plan, we may
+     * get the right value.
+     */
+    builder.setSchemaName("");
+
+    /**
+     * Designated column's table name. Not set if not applicable. Initial implementation defaults to empty string as
+     * we use LIMIT 0 queries to get the schema and table info is lost. If we derive the table from plan, we may get
+     * the right value.
+     */
+    builder.setTableName("");
+
+    builder.setColumnName(field.getNamePart().getName());
+
+    /**
+     * Column label name for display or print purposes.
+     * Ex. a column named "empName" might be labeled as "Employee Name".
+     * Initial implementation defaults to same value as column name.
+     */
+    builder.setLabel(field.getNamePart().getName());
+
+    /**
+     * Data type in string format. Value is SQL standard type.
+     */
+    builder.setDataType(Types.getSqlTypeName(majorType));
+
+    builder.setIsNullable(majorType.getMode() == DataMode.OPTIONAL);
+
+    /**
+     * For numeric data, this is the maximum precision.
+     * For character data, this is the length in characters.
+     * For datetime datatypes, this is the length in characters of the String representation
+     *    (assuming the maximum allowed precision of the fractional seconds component).
+     * For binary data, this is the length in bytes.
+     * For all other types 0 is returned where the column size is not applicable.
+     */
+    builder.setPrecision(Types.getPrecision(field.getMajorType()));
+
+    /**
+     * Column's number of digits to right of the decimal point. 0 is returned for types where the scale is not applicable
+     */
+    builder.setScale(Types.getScale(majorType));
+
+    /**
+     * Indicates whether values in the designated column are signed numbers.
+     */
+    builder.setSigned(Types.isNumericType(majorType));
+
+    /**
+     * Maximum number of characters required to display data from the column. Initial implementation hard coded to 10.
+     */
+    builder.setDisplaySize(10);
+
+    /**
+     * Is the column an aliased column. Initial implementation defaults to true as we derive schema from LIMIT 0 query and
+     * not plan
+     */
+    builder.setIsAliased(true);
+
+    builder.setSearchability(ColumnSearchability.ALL);
+    builder.setUpdatability(ColumnUpdatability.READ_ONLY);
+    builder.setAutoIncrement(false);
+    builder.setCaseSensitivity(false);
+    builder.setSortable(Types.isSortable(minorType));
+
+    /**
+     * Returns the fully-qualified name of the Java class whose instances are manufactured if the method
+     * ResultSet.getObject is called to retrieve a value from the column. Applicable only to JDBC clients.
+     */
+    builder.setClassName(DRILL_TYPE_TO_JDBC_CLASSNAME.get(minorType));
+
+    builder.setIsCurrency(false);
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index cc614d2..c1fa7a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
 import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
 import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -37,6 +38,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.metadata.MetadataProvider;
+import org.apache.drill.exec.work.prepare.PreparedStatementProvider.PreparedStatementWorker;
 
 public class UserWorker{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
@@ -118,4 +120,9 @@ public class UserWorker{
   public void submitColumnsMetadataWork(UserSession session, GetColumnsReq req, ResponseSender sender) {
     bee.addNewWork(MetadataProvider.columns(session, bee.getContext(), req, sender));
   }
+
+  public void submitPreparedStatementWork(final UserClientConnection connection, final CreatePreparedStatementReq req,
+      final ResponseSender sender) {
+    bee.addNewWork(new PreparedStatementWorker(connection, this, sender, req));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 7ab73dc..3eded52 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
@@ -293,9 +294,20 @@ public class BaseTestQuery extends ExecTest {
     return testRunAndReturn(QueryType.PHYSICAL, physical);
   }
 
-  public static List<QueryDataBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
-    query = QueryTestUtil.normalizeQuery(query);
-    return client.runQuery(type, query);
+  public static List<QueryDataBatch>  testRunAndReturn(QueryType type, Object query) throws Exception{
+    if (type == QueryType.PREPARED_STATEMENT) {
+      Preconditions.checkArgument(query instanceof PreparedStatementHandle,
+          "Expected an instance of PreparedStatement as input query");
+      return testPreparedStatement((PreparedStatementHandle)query);
+    } else {
+      Preconditions.checkArgument(query instanceof String, "Expected a string as input query");
+      query = QueryTestUtil.normalizeQuery((String)query);
+      return client.runQuery(type, (String)query);
+    }
+  }
+
+  public static List<QueryDataBatch> testPreparedStatement(PreparedStatementHandle handle) throws Exception {
+    return client.executePreparedStatement(handle);
   }
 
   public static int testRunAndPrint(final QueryType type, final String query) throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 2a9c03d..9df9139 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -81,8 +81,10 @@ public class DrillTestWrapper {
   // for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information
   // from the test query.
   private TestBuilder testBuilder;
-  // test query to run
-  private String query;
+  /**
+   * Test query to run. Type of object depends on the {@link #queryType}
+   */
+  private Object query;
   // The type of query provided
   private UserBitShared.QueryType queryType;
   // The type of query provided for the baseline
@@ -106,7 +108,7 @@ public class DrillTestWrapper {
 
   private int expectedNumBatches;
 
-  public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, String query, QueryType queryType,
+  public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, Object query, QueryType queryType,
                           String baselineOptionSettingQueries, String testOptionSettingQueries,
                           QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
                           List<Map<String, Object>> baselineRecords, int expectedNumBatches) {

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index b073371..8acf936 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -36,6 +36,8 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.Text;
@@ -45,8 +47,10 @@ import com.google.common.base.Preconditions;
 
 public class TestBuilder {
 
-  // test query to run
-  private String query;
+  /**
+   * Test query to rung. Type of object depends on the {@link #queryType}
+   */
+  private Object query;
   // the type of query for the test
   private UserBitShared.QueryType queryType;
   // should the validation enforce ordering
@@ -85,7 +89,7 @@ public class TestBuilder {
     reset();
   }
 
-  public TestBuilder(BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+  public TestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                      boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                      String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                      int expectedNumBatches) {
@@ -141,6 +145,12 @@ public class TestBuilder {
     return sqlQuery(String.format(query, replacements));
   }
 
+  public TestBuilder preparedStatement(PreparedStatementHandle preparedStatementHandle) {
+    queryType = QueryType.PREPARED_STATEMENT;
+    query = preparedStatementHandle;
+    return this;
+  }
+
   public TestBuilder sqlQueryFromFile(String queryFile) throws IOException {
     String query = BaseTestQuery.getFile(queryFile);
     this.query = query;
@@ -210,7 +220,7 @@ public class TestBuilder {
     }
   }
 
-  String getValidationQuery() throws Exception {
+  Object getValidationQuery() throws Exception {
     throw new RuntimeException("Must provide some kind of baseline, either a baseline file or another query");
   }
 
@@ -354,7 +364,7 @@ public class TestBuilder {
   }
 
   // provide a SQL query to validate against
-  public BaselineQueryTestBuilder sqlBaselineQuery(String baselineQuery) {
+  public BaselineQueryTestBuilder sqlBaselineQuery(Object baselineQuery) {
     return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
@@ -403,7 +413,7 @@ public class TestBuilder {
     // that come out of the test query drive interpretation of baseline
     private TypeProtos.MajorType[] baselineTypes;
 
-    CSVTestBuilder(String baselineFile, BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+    CSVTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                    boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                    String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                    int expectedNumBatches) {
@@ -494,7 +504,7 @@ public class TestBuilder {
 
   public class SchemaTestBuilder extends TestBuilder {
     private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
-    SchemaTestBuilder(BufferAllocator allocator, String query, UserBitShared.QueryType queryType,
+    SchemaTestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType,
         String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
       super(allocator, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
       expectsEmptyResultSet();
@@ -535,7 +545,7 @@ public class TestBuilder {
     // path to the baseline file that will be inserted into the validation query
     private String baselineFilePath;
 
-    JSONTestBuilder(String baselineFile, BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+    JSONTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                     boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                     String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                     int expectedNumBatches) {
@@ -559,11 +569,14 @@ public class TestBuilder {
 
   public class BaselineQueryTestBuilder extends TestBuilder {
 
-    private String baselineQuery;
+    /**
+     * Baseline query. Type of object depends on {@link #baselineQueryType}
+     */
+    private Object baselineQuery;
     private UserBitShared.QueryType baselineQueryType;
 
-    BaselineQueryTestBuilder(String baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
-                             String query, UserBitShared.QueryType queryType, Boolean ordered,
+    BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
+                             Object query, UserBitShared.QueryType queryType, Boolean ordered,
                              boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                              String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                              int expectedNumBatches) {
@@ -574,7 +587,7 @@ public class TestBuilder {
     }
 
     @Override
-    String getValidationQuery() {
+    Object getValidationQuery() {
       return baselineQuery;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
new file mode 100644
index 0000000..5a78cc9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.prepare;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
+import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import java.sql.Date;
+import java.util.List;
+
+/**
+ * Tests for creating and executing prepared statements.
+ */
+public class TestPreparedStatementProvider extends BaseTestQuery {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPreparedStatementProvider.class);
+
+  /**
+   * Simple query.
+   * @throws Exception
+   */
+  @Test
+  public void simple() throws Exception {
+    String query = "SELECT * FROM cp.`region.json` ORDER BY region_id LIMIT 1";
+    PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+    List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+        new ExpectedColumnResult("region_id", "BIGINT", true, 0, 0, true, Long.class.getName()),
+        new ExpectedColumnResult("sales_city", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("sales_state_province", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("sales_district", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("sales_region", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("sales_country", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("sales_district_id", "BIGINT", true, 0, 0, true, Long.class.getName())
+    );
+
+    verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+
+    testBuilder()
+        .unOrdered()
+        .preparedStatement(preparedStatement.getServerHandle())
+        .baselineColumns("region_id", "sales_city", "sales_state_province", "sales_district",
+            "sales_region", "sales_country", "sales_district_id")
+        .baselineValues(0L, "None", "None", "No District", "No Region", "No Country", 0L)
+        .go();
+  }
+
+  /**
+   * Create a prepared statement for a query that has GROUP BY clause in it
+   */
+  @Test
+  public void groupByQuery() throws Exception {
+    String query = "SELECT sales_city, count(*) as cnt FROM cp.`region.json` " +
+        "GROUP BY sales_city ORDER BY sales_city DESC LIMIT 1";
+    PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+    List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+        new ExpectedColumnResult("sales_city", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+        new ExpectedColumnResult("cnt", "BIGINT", false, 0, 0, true, Long.class.getName())
+    );
+
+    verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+
+    testBuilder()
+        .unOrdered()
+        .preparedStatement(preparedStatement.getServerHandle())
+        .baselineColumns("sales_city", "cnt")
+        .baselineValues("Yakima", 1L)
+        .go();
+  }
+
+  /**
+   * Create a prepared statement for a query that joins two tables and has ORDER BY clause.
+   */
+  @Test
+  public void joinOrderByQuery() throws Exception {
+    String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey FROM cp.`tpch/lineitem.parquet` l JOIN cp.`tpch/orders.parquet` o " +
+        "ON l.l_orderkey = o.o_orderkey LIMIT 2";
+
+    PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+    List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+        new ExpectedColumnResult("l_quantity", "DOUBLE", false, 0, 0, true, Double.class.getName()),
+        new ExpectedColumnResult("l_shipdate", "DATE", false, 0, 0, false, Date.class.getName()),
+        new ExpectedColumnResult("o_custkey", "INTEGER", false, 0, 0, true, Integer.class.getName())
+    );
+
+    verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+  }
+
+  /**
+   * Pass an invalid query to the create prepare statement request and expect a parser failure.
+   * @throws Exception
+   */
+  @Test
+  public void invalidQueryParserError() throws Exception {
+    createPrepareStmt("BLAH BLAH", true, ErrorType.PARSE);
+  }
+
+  /**
+   * Pass an invalid query to the create prepare statement request and expect a validation failure.
+   * @throws Exception
+   */
+  @Test
+  public void invalidQueryValidationError() throws Exception {
+    createPrepareStmt("SELECT * sdflkgdh", true, ErrorType.PARSE /** Drill returns incorrect error for parse error*/);
+  }
+
+  /* Helper method which creates a prepared statement for given query. */
+  private static PreparedStatement createPrepareStmt(String query, boolean expectFailure, ErrorType errorType) throws Exception {
+    CreatePreparedStatementResp resp = client.createPreparedStatement(query).get();
+
+    assertEquals(expectFailure ? RequestStatus.FAILED : RequestStatus.OK, resp.getStatus());
+
+    if (expectFailure) {
+      assertEquals(errorType, resp.getError().getErrorType());
+    } else {
+      logger.error("Prepared statement creation failed: {}", resp.getError().getMessage());
+    }
+
+    return resp.getPreparedStatement();
+  }
+
+  private static class ExpectedColumnResult {
+    final String columnName;
+    final String type;
+    final boolean nullable;
+    final int precision;
+    final int scale;
+    final boolean signed;
+    final String className;
+
+    ExpectedColumnResult(String columnName, String type, boolean nullable, int precision, int scale, boolean signed,
+        String className) {
+      this.columnName = columnName;
+      this.type = type;
+      this.nullable = nullable;
+      this.precision = precision;
+      this.scale = scale;
+      this.signed = signed;
+      this.className = className;
+    }
+
+    boolean isEqualsTo(ResultColumnMetadata result) {
+      return
+          result.getCatalogName().equals(InfoSchemaConstants.IS_CATALOG_NAME) &&
+          result.getSchemaName().isEmpty() &&
+          result.getTableName().isEmpty() &&
+          result.getColumnName().equals(columnName) &&
+          result.getLabel().equals(columnName) &&
+          result.getDataType().equals(type) &&
+          result.getIsNullable() == nullable &&
+          result.getPrecision() == precision &&
+          result.getScale() == scale &&
+          result.getSigned() == signed &&
+          result.getDisplaySize() == 10 &&
+          result.getClassName().equals(className) &&
+          result.getSearchability() == ColumnSearchability.ALL &&
+          result.getAutoIncrement() == false &&
+          result.getCaseSensitivity() == false &&
+          result.getUpdatability() == ColumnUpdatability.READ_ONLY &&
+          result.getIsAliased() == true &&
+          result.getIsCurrency() == false;
+    }
+
+    @Override
+    public String toString() {
+      return "ExpectedColumnResult[" +
+          "columnName='" + columnName + '\'' +
+          ", type='" + type + '\'' +
+          ", nullable=" + nullable +
+          ", precision=" + precision +
+          ", scale=" + scale +
+          ", signed=" + signed +
+          ", className='" + className + '\'' +
+          ']';
+    }
+  }
+
+  private static void verifyMetadata(List<ExpectedColumnResult> expMetadata,
+      List<ResultColumnMetadata> actMetadata) {
+    assertEquals(expMetadata.size(), actMetadata.size());
+
+    for(ExpectedColumnResult exp : expMetadata) {
+      boolean found = false;
+      for(ResultColumnMetadata act : actMetadata) {
+        found = exp.isEqualsTo(act);
+        if (found) {
+          break;
+        }
+      }
+      assertTrue("Failed to find the expected column metadata: " + exp, found);
+    }
+  }
+}