You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/08/08 20:49:52 UTC
[4/4] drill git commit: DRILL-4729: Add support for prepared
statement implementation on server side
DRILL-4729: Add support for prepared statement implementation on server side
+ Add following APIs for Drill Java client
- DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query)
- void executePreparedStatement(final PreparedStatement preparedStatement, UserResultsListener resultsListener)
- List<QueryDataBatch> executePreparedStatement(final PreparedStatement preparedStatement) (for testing purpose)
+ Separated out the interface from UserClientConnection. It makes it easy to have wrappers which need to
tap the messages and data going to the actual client.
+ Implement CREATE_PREPARED_STATEMENT and handle RunQuery with PreparedStatement
+ Test changes to support prepared statement as query type
+ Add tests in TestPreparedStatementProvider
this closes #530
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/14f6ec7d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/14f6ec7d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/14f6ec7d
Branch: refs/heads/master
Commit: 14f6ec7dd9b010de6c884431e443eb788ce54339
Parents: ef6e522
Author: vkorukanti <ve...@dremio.com>
Authored: Mon Jun 13 11:20:25 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Mon Aug 8 13:47:49 2016 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/types/Types.java | 41 +
.../org/apache/drill/exec/ExecConstants.java | 7 +
.../apache/drill/exec/client/DrillClient.java | 77 +-
.../apache/drill/exec/rpc/user/UserClient.java | 3 +
.../drill/exec/rpc/user/UserRpcConfig.java | 4 +
.../apache/drill/exec/rpc/user/UserServer.java | 99 +-
.../server/options/SystemOptionManager.java | 3 +-
.../apache/drill/exec/work/foreman/Foreman.java | 46 +-
.../work/prepare/PreparedStatementProvider.java | 400 +
.../apache/drill/exec/work/user/UserWorker.java | 7 +
.../java/org/apache/drill/BaseTestQuery.java | 18 +-
.../java/org/apache/drill/DrillTestWrapper.java | 8 +-
.../test/java/org/apache/drill/TestBuilder.java | 37 +-
.../prepare/TestPreparedStatementProvider.java | 222 +
.../org/apache/drill/exec/proto/ExecProtos.java | 502 +-
.../drill/exec/proto/SchemaExecProtos.java | 111 +
.../drill/exec/proto/SchemaUserProtos.java | 988 +-
.../apache/drill/exec/proto/UserBitShared.java | 67 +-
.../org/apache/drill/exec/proto/UserProtos.java | 25966 +++++++++++------
.../exec/proto/beans/ColumnSearchability.java | 55 +
.../exec/proto/beans/ColumnUpdatability.java | 51 +
.../proto/beans/CreatePreparedStatementReq.java | 163 +
.../beans/CreatePreparedStatementResp.java | 211 +
.../drill/exec/proto/beans/GetCatalogsResp.java | 2 +-
.../drill/exec/proto/beans/GetColumnsResp.java | 2 +-
.../drill/exec/proto/beans/GetSchemasResp.java | 2 +-
.../drill/exec/proto/beans/GetTablesResp.java | 2 +-
.../exec/proto/beans/PreparedStatement.java | 199 +
.../proto/beans/PreparedStatementHandle.java | 164 +
.../drill/exec/proto/beans/QueryType.java | 4 +-
.../drill/exec/proto/beans/RequestStatus.java | 8 +-
.../exec/proto/beans/ResultColumnMetadata.java | 559 +
.../apache/drill/exec/proto/beans/RpcType.java | 4 +
.../apache/drill/exec/proto/beans/RunQuery.java | 24 +
.../beans/ServerPreparedStatementState.java | 163 +
.../src/main/protobuf/ExecutionProtos.proto | 9 +
protocol/src/main/protobuf/User.proto | 208 +-
protocol/src/main/protobuf/UserBitShared.proto | 3 +
38 files changed, 20685 insertions(+), 9754 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 74b313e..504b876 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -591,4 +591,45 @@ public class Types {
return type != null ? "MajorType[" + TextFormat.shortDebugString(type) + "]" : "null";
}
+ /**
+ * Get the <code>precision</code> of given type.
+ * @param majorType
+ * @return
+ */
+ public static int getPrecision(MajorType majorType) {
+ MinorType type = majorType.getMinorType();
+
+ if (type == MinorType.VARBINARY || type == MinorType.VARCHAR) {
+ return 65536;
+ }
+
+ if (majorType.hasPrecision()) {
+ return majorType.getPrecision();
+ }
+
+ return 0;
+ }
+
+ /**
+ * Get the <code>scale</code> of given type.
+ * @param majorType
+ * @return
+ */
+ public static int getScale(MajorType majorType) {
+ if (majorType.hasScale()) {
+ return majorType.getScale();
+ }
+
+ return 0;
+ }
+
+ /**
+ * Is the given type column be used in ORDER BY clause?
+ * @param type
+ * @return
+ */
+ public static boolean isSortable(MinorType type) {
+ // Currently only map and list columns are not sortable.
+ return type != MinorType.MAP && type != MinorType.LIST;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 64931a2..d6a210a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -330,4 +330,11 @@ public interface ExecConstants {
String CODE_GEN_EXP_IN_METHOD_SIZE = "exec.java.compiler.exp_in_method_size";
LongValidator CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR = new LongValidator(CODE_GEN_EXP_IN_METHOD_SIZE, 50);
+ /**
+ * Timeout for create prepare statement request. If the request exceeds this timeout, then request is timed out.
+ * Default value is 10mins.
+ */
+ String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms";
+ OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR =
+ new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 8063778..e81a4fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -53,6 +53,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
@@ -63,9 +65,11 @@ import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -83,6 +87,7 @@ import org.apache.drill.exec.rpc.user.UserResultsListener;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -325,13 +330,22 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
/**
- * Submits a Logical plan for direct execution (bypasses parsing)
+ * Submits a string based query plan for execution and returns the result batches. Supported query types are:
+ * <p><ul>
+ * <li>{@link QueryType#LOGICAL}
+ * <li>{@link QueryType#PHYSICAL}
+ * <li>{@link QueryType#SQL}
+ * </ul>
*
- * @param plan the plan to execute
+ * @param type Query type
+ * @param plan Query to execute
* @return a handle for the query result
* @throws RpcException
*/
public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
+ checkArgument(type == QueryType.LOGICAL || type == QueryType.PHYSICAL || type == QueryType.SQL,
+ String.format("Only query types %s, %s and %s are supported in this API",
+ QueryType.LOGICAL, QueryType.PHYSICAL, QueryType.SQL));
final UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
final ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
client.submitQuery(listener, query);
@@ -352,7 +366,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
/**
- * Run query based on list of fragments that were supposedly produced during query planning phase
+ * Run query based on list of fragments that were supposedly produced during query planning phase. Supported
+ * query type is {@link QueryType#EXECUTION}
* @param type
* @param planFragments
* @param resultsListener
@@ -512,6 +527,62 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
/**
+ * Create a prepared statement for given <code>query</code>.
+ *
+ * @param query
+ * @return
+ */
+ public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) {
+ final CreatePreparedStatementReq req =
+ CreatePreparedStatementReq.newBuilder()
+ .setSqlQuery(query)
+ .build();
+
+ return client.send(RpcType.CREATE_PREPARED_STATEMENT, req, CreatePreparedStatementResp.class);
+ }
+
+ /**
+ * Execute the given prepared statement.
+ *
+ * @param preparedStatementHandle Prepared statement handle returned in response to
+ * {@link #createPreparedStatement(String)}.
+ * @param resultsListener {@link UserResultsListener} instance for listening for query results.
+ */
+ public void executePreparedStatement(final PreparedStatementHandle preparedStatementHandle,
+ final UserResultsListener resultsListener) {
+ final RunQuery runQuery = newBuilder()
+ .setResultsMode(STREAM_FULL)
+ .setType(QueryType.PREPARED_STATEMENT)
+ .setPreparedStatementHandle(preparedStatementHandle)
+ .build();
+ client.submitQuery(resultsListener, runQuery);
+ }
+
+ /**
+ * Execute the given prepared statement and return the results.
+ *
+ * @param preparedStatementHandle Prepared statement handle returned in response to
+ * {@link #createPreparedStatement(String)}.
+ * @return List of {@link QueryDataBatch}s. It is responsibility of the caller to release query data batches.
+ * @throws RpcException
+ */
+ @VisibleForTesting
+ public List<QueryDataBatch> executePreparedStatement(final PreparedStatementHandle preparedStatementHandle)
+ throws RpcException {
+ final RunQuery runQuery = newBuilder()
+ .setResultsMode(STREAM_FULL)
+ .setType(QueryType.PREPARED_STATEMENT)
+ .setPreparedStatementHandle(preparedStatementHandle)
+ .build();
+
+ final ListHoldingResultsListener resultsListener = new ListHoldingResultsListener(runQuery);
+
+ client.submitQuery(resultsListener, runQuery);
+
+ return resultsListener.getResults();
+ }
+
+ /**
* Submits a Logical plan for direct execution (bypasses parsing)
*
* @param plan the plan to execute
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5106787..c89ed0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -117,6 +118,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
return GetTablesResp.getDefaultInstance();
case RpcType.COLUMNS_VALUE:
return GetColumnsResp.getDefaultInstance();
+ case RpcType.PREPARED_STATEMENT_VALUE:
+ return CreatePreparedStatementResp.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 809ac65..c520744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
@@ -61,6 +63,8 @@ public class UserRpcConfig {
.add(RpcType.GET_SCHEMAS, GetSchemasReq.class, RpcType.SCHEMAS, GetSchemasResp.class) // user to bit
.add(RpcType.GET_TABLES, GetTablesReq.class, RpcType.TABLES, GetTablesResp.class) // user to bit
.add(RpcType.GET_COLUMNS, GetColumnsReq.class, RpcType.COLUMNS, GetColumnsResp.class) // user to bit
+ .add(RpcType.CREATE_PREPARED_STATEMENT, CreatePreparedStatementReq.class,
+ RpcType.PREPARED_STATEMENT, CreatePreparedStatementResp.class) // user to bit
.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index adf7ec4..9a15d96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -38,6 +41,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -58,6 +62,7 @@ import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnectionImpl;
import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
@@ -66,7 +71,7 @@ import org.apache.drill.exec.work.user.UserWorker;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
-public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
+public class UserServer extends BasicServer<RpcType, UserClientConnectionImpl> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
final UserWorker worker;
@@ -106,7 +111,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- protected void handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ protected void handle(UserClientConnectionImpl connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
ResponseSender responseSender) throws RpcException {
switch (rpcType) {
@@ -180,16 +185,69 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
} catch (final InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding GetColumnsReq body.", e);
}
+ case RpcType.CREATE_PREPARED_STATEMENT_VALUE:
+ try {
+ final CreatePreparedStatementReq req =
+ CreatePreparedStatementReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ worker.submitPreparedStatementWork(connection, req, responseSender);
+ break;
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding CreatePreparedStatementReq body.", e);
+ }
default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
}
- public class UserClientConnection extends RemoteConnection {
+ /**
+ * Interface for getting user session properties and interacting with user connection. Separating this interface from
+ * {@link RemoteConnection} implementation for user connection:
+ * <p><ul>
+ * <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
+ * <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
+ * going to the actual client.
+ * </ul>
+ */
+ public interface UserClientConnection {
+ /**
+ * @return User session object.
+ */
+ UserSession getSession();
+
+ /**
+ * Send query result outcome to client. Outcome is returned through <code>listener</code>
+ * @param listener
+ * @param result
+ */
+ void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
+
+ /**
+ * Send query data to client. Outcome is returned through <code>listener</code>
+ * @param listener
+ * @param result
+ */
+ void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
+
+ /**
+ * Returns the {@link ChannelFuture} which will be notified when this
+ * channel is closed. This method always returns the same future instance.
+ */
+ ChannelFuture getChannelClosureFuture();
+
+ /**
+ * @return Return the client node address.
+ */
+ SocketAddress getRemoteAddress();
+ }
+
+ /**
+ * {@link RemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}.
+ */
+ public class UserClientConnectionImpl extends RemoteConnection implements UserClientConnection {
private UserSession session;
- public UserClientConnection(SocketChannel channel) {
+ public UserClientConnectionImpl(SocketChannel channel) {
super(channel, "user client");
}
@@ -197,7 +255,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
}
- void setUser(UserToBitHandshake inbound) throws IOException {
+ void setUser(final UserToBitHandshake inbound) throws IOException {
session = UserSession.Builder.newBuilder()
.withCredentials(inbound.getCredentials())
.withOptionManager(worker.getSystemOptions())
@@ -210,38 +268,47 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
}
+ @Override
public UserSession getSession(){
return session;
}
- public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result, boolean allowInEventThread){
+ @Override
+ public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) {
logger.trace("Sending result to client with {}", result);
- send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, allowInEventThread);
- }
-
- public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
- sendData(listener, result, false);
+ send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, true);
}
- public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+ @Override
+ public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) {
logger.trace("Sending data to client with {}", result);
- send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+ send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers());
}
+
@Override
public BufferAllocator getAllocator() {
return alloc;
}
+ @Override
+ public ChannelFuture getChannelClosureFuture() {
+ return getChannel().closeFuture();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return getChannel().remoteAddress();
+ }
}
@Override
- public UserClientConnection initRemoteConnection(SocketChannel channel) {
+ public UserClientConnectionImpl initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- return new UserClientConnection(channel);
+ return new UserClientConnectionImpl(channel);
}
@Override
- protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final UserClientConnection connection) {
+ protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final UserClientConnectionImpl connection) {
return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3053c85..71d9f0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -145,7 +145,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR,
- ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR
+ ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR,
+ ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR
};
final Map<String, OptionValidator> tmp = new HashMap<>();
for (final OptionValidator validator : validators) {
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 2829ac1..808ba07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -61,10 +61,12 @@ import org.apache.drill.exec.proto.BitControl.InitializeFragments;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
@@ -92,6 +94,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
@@ -162,7 +165,7 @@ public class Foreman implements Runnable {
this.drillbitContext = drillbitContext;
initiatingClient = connection;
- this.closeFuture = initiatingClient.getChannel().closeFuture();
+ closeFuture = initiatingClient.getChannelClosureFuture();
closeFuture.addListener(closeListener);
queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
@@ -254,11 +257,18 @@ public class Foreman implements Runnable {
parseAndRunPhysicalPlan(queryRequest.getPlan());
break;
case SQL:
- runSQL(queryRequest.getPlan());
+ final String sql = queryRequest.getPlan();
+ // log query id and query text before starting any real work. Also, put
+ // them together such that it is easy to search based on query id
+ logger.info("Query text for query id {}: {}", this.queryIdString, sql);
+ runSQL(sql);
break;
case EXECUTION:
runFragment(queryRequest.getFragmentsList());
break;
+ case PREPARED_STATEMENT:
+ runPreparedStatement(queryRequest.getPreparedStatementHandle());
+ break;
default:
throw new IllegalStateException();
}
@@ -484,7 +494,31 @@ public class Foreman implements Runnable {
logger.debug("Fragments running.");
}
+ /**
+ * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
+ * object of the <code>preparedStatement</code> and submits as a new query.
+ *
+ * @param preparedStatementHandle
+ * @throws ExecutionSetupException
+ */
+ private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
+ throws ExecutionSetupException {
+ final ServerPreparedStatementState serverState;
+
+ try {
+ serverState =
+ ServerPreparedStatementState.PARSER.parseFrom(preparedStatementHandle.getServerInfo());
+ } catch (final InvalidProtocolBufferException ex) {
+ throw UserException.parseError(ex)
+ .message("Failed to parse the prepared statement handle. " +
+ "Make sure the handle is same as one returned from create prepared statement call.")
+ .build(logger);
+ }
+ final String sql = serverState.getSqlQuery();
+ logger.info("Prepared statement query for QueryId {} : {}", queryId, sql);
+ runSQL(sql);
+ }
private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
if (plan.getProperties().resultMode != ResultMode.EXEC) {
@@ -734,7 +768,7 @@ public class Foreman implements Runnable {
new Date(System.currentTimeMillis()),
state,
queryContext.getSession().getCredentials().getUserName(),
- initiatingClient.getChannel().remoteAddress());
+ initiatingClient.getRemoteAddress());
queryLogger.info(MAPPER.writeValueAsString(q));
} catch (Exception e) {
logger.error("Failure while recording query information to query log.", e);
@@ -805,7 +839,7 @@ public class Foreman implements Runnable {
*/
try {
// send whatever result we ended up with
- initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
+ initiatingClient.sendResult(responseListener, resultBuilder.build());
} catch(final Exception e) {
addException(e);
logger.warn("Exception sending result to client", resultException);
@@ -970,10 +1004,6 @@ public class Foreman implements Runnable {
}
private void runSQL(final String sql) throws ExecutionSetupException {
- // log query id and query text before starting any real work. Also, put
- // them together such that it is easy to search based on query id
- logger.info("Query text for query id {}: {}", this.queryIdString, sql);
-
final Pointer<String> textPlan = new Pointer<>();
final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
queryManager.setPlanText(textPlan.value);
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
new file mode 100644
index 0000000..982d8a3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.prepare;
+
+import static org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
+
+import org.apache.drill.common.exceptions.ErrorHelper;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
+import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.work.user.UserWorker;
+import org.joda.time.Period;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import java.math.BigDecimal;
+import java.net.SocketAddress;
+import java.sql.Date;
+import java.sql.ResultSetMetaData;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Contains worker {@link Runnable} for creating a prepared statement and helper methods.
+ */
+public class PreparedStatementProvider {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
+
+ /**
+ * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to be returned through
+ * {@link ResultSetMetaData#getColumnClassName(int)}.
+ */
+ private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType, String>builder()
+ .put(MinorType.INT, Integer.class.getName())
+ .put(MinorType.BIGINT, Long.class.getName())
+ .put(MinorType.FLOAT4, Float.class.getName())
+ .put(MinorType.FLOAT8, Double.class.getName())
+ .put(MinorType.VARCHAR, String.class.getName())
+ .put(MinorType.BIT, Boolean.class.getName())
+ .put(MinorType.DATE, Date.class.getName())
+ .put(MinorType.DECIMAL9, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL18, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL28SPARSE, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL38SPARSE, BigDecimal.class.getName())
+ .put(MinorType.TIME, Time.class.getName())
+ .put(MinorType.TIMESTAMP, Timestamp.class.getName())
+ .put(MinorType.VARBINARY, byte[].class.getName())
+ .put(MinorType.INTERVALYEAR, Period.class.getName())
+ .put(MinorType.INTERVALDAY, Period.class.getName())
+ .put(MinorType.MAP, Object.class.getName())
+ .put(MinorType.LIST, Object.class.getName())
+ .put(MinorType.UNION, Object.class.getName())
+ .build();
+
+ /**
+ * Runnable that creates a prepared statement for given {@link CreatePreparedStatementReq} and
+ * sends the response at the end.
+ */
+ public static class PreparedStatementWorker implements Runnable {
+ private final UserClientConnection connection;
+ private final UserWorker userWorker;
+ private final ResponseSender responseSender;
+ private final CreatePreparedStatementReq req;
+
+ public PreparedStatementWorker(final UserClientConnection connection, final UserWorker userWorker,
+ final ResponseSender responseSender, final CreatePreparedStatementReq req) {
+ this.connection = connection;
+ this.userWorker = userWorker;
+ this.responseSender = responseSender;
+ this.req = req;
+ }
+
+ @Override
+ public void run() {
+ final CreatePreparedStatementResp.Builder respBuilder = CreatePreparedStatementResp.newBuilder();
+ try {
+ UserClientConnectionWrapper wrapper = new UserClientConnectionWrapper(connection);
+
+ final RunQuery limit0Query =
+ RunQuery.newBuilder()
+ .setType(QueryType.SQL)
+ .setPlan(String.format("SELECT * FROM (%s) LIMIT 0", req.getSqlQuery()))
+ .build();
+
+ final QueryId limit0QueryId = userWorker.submitWork(wrapper, limit0Query);
+
+ final long timeoutMillis =
+ userWorker.getSystemOptions().getOption(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS).num_val;
+
+ try {
+ if (!wrapper.await(timeoutMillis)) {
+ logger.error("LIMIT 0 query (QueryId: {}) for prepared statement took longer than {} ms. Cancelling.",
+ limit0QueryId, timeoutMillis);
+ userWorker.cancelQuery(limit0QueryId);
+ final String errorMsg = String.format(
+ "LIMIT 0 query (QueryId: %s) for prepared statement took longer than %d ms. " +
+ "Query cancellation requested.\n" +
+ "Retry after changing the option '%s' to a higher value.",
+ limit0QueryId, timeoutMillis, CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS);
+ setErrorHelper(respBuilder, TIMEOUT, null, errorMsg, ErrorType.SYSTEM);
+ return;
+ }
+ } catch (InterruptedException ex) {
+ setErrorHelper(respBuilder, FAILED, ex, "Prepared statement creation interrupted.", ErrorType.SYSTEM);
+ return;
+ }
+
+ if (wrapper.getError() != null) {
+ setErrorHelper(respBuilder, wrapper.getError(), "Failed to get result set schema for prepare statement.");
+ return;
+ }
+
+ final PreparedStatement.Builder prepStmtBuilder = PreparedStatement.newBuilder();
+
+ for (SerializedField field : wrapper.getFields()) {
+ prepStmtBuilder.addColumns(serializeColumn(field));
+ }
+
+ prepStmtBuilder.setServerHandle(
+ PreparedStatementHandle.newBuilder()
+ .setServerInfo(
+ ServerPreparedStatementState.newBuilder()
+ .setSqlQuery(req.getSqlQuery())
+ .build().toByteString()
+ )
+ );
+
+ respBuilder.setStatus(OK);
+ respBuilder.setPreparedStatement(prepStmtBuilder.build());
+ } catch (Throwable e) {
+ setErrorHelper(respBuilder, FAILED, e, "Failed to create prepared statement.", ErrorType.SYSTEM);
+ } finally {
+ responseSender.send(new Response(RpcType.PREPARED_STATEMENT, respBuilder.build()));
+ }
+ }
+ }
+
+ /**
+ * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code>
+ */
+ private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
+ final Throwable ex, final String message, final ErrorType errorType) {
+ respBuilder.setStatus(status);
+ final String errorId = UUID.randomUUID().toString();
+ if (ex != null) {
+ logger.error("{} ErrorId: {}", message, errorId, ex);
+ } else {
+ logger.error("{} ErrorId: {}", message, errorId);
+ }
+
+ final DrillPBError.Builder builder = DrillPBError.newBuilder();
+ builder.setErrorType(errorType);
+ builder.setErrorId(errorId);
+ builder.setMessage(message);
+
+ if (ex != null) {
+ builder.setException(ErrorHelper.getWrapper(ex));
+ }
+
+ respBuilder.setError(builder.build());
+ }
+
+ /**
+ * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code>
+ */
+ private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
+ final String message) {
+ respBuilder.setStatus(FAILED);
+ final String errorId = UUID.randomUUID().toString();
+ logger.error("{} ErrorId: {}", message, errorId);
+
+ respBuilder.setError(error);
+ }
+
+ /**
+ * Decorator around {@link UserClientConnection} to tap the query results for LIMIT 0 query.
+ */
+ private static class UserClientConnectionWrapper implements UserClientConnection {
+ private final UserClientConnection inner;
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private volatile DrillPBError error;
+ private volatile List<SerializedField> fields;
+
+ UserClientConnectionWrapper(UserClientConnection inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public UserSession getSession() {
+ return inner.getSession();
+ }
+
+ @Override
+ public ChannelFuture getChannelClosureFuture() {
+ return inner.getChannelClosureFuture();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return inner.getRemoteAddress();
+ }
+
+ @Override
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result) {
+ // Release the wait latch if the query is terminated.
+ final QueryState state = result.getQueryState();
+ if (state == QueryState.FAILED || state == QueryState.CANCELED || state == QueryState.COMPLETED) {
+ if (state == QueryState.FAILED) {
+ error = result.getError(0);
+ }
+ latch.countDown();
+ }
+
+ listener.success(Acks.OK, null);
+ }
+
+ @Override
+ public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
+ // Save the query results schema and release the buffers.
+ if (fields == null) {
+ fields = result.getHeader().getDef().getFieldList();
+ }
+
+ for(ByteBuf buf : result.getBuffers()) {
+ buf.release();
+ }
+
+ listener.success(Acks.OK, null);
+ }
+
+ /**
+ * Wait until the query has completed.
+ * @throws InterruptedException
+ */
+ boolean await(final long timeoutMillis) throws InterruptedException {
+ return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Any error returned in query execution.
+ */
+ DrillPBError getError() {
+ return error;
+ }
+
+ /**
+ * @return Schema returned in query result batch.
+ */
+ List<SerializedField> getFields() {
+ return fields;
+ }
+ }
+
+ /**
+ * Serialize the given {@link SerializedField} into a {@link ResultColumnMetadata}.
+ * @param field
+ * @return
+ */
+ private static ResultColumnMetadata serializeColumn(SerializedField field) {
+ final ResultColumnMetadata.Builder builder = ResultColumnMetadata.newBuilder();
+ final MajorType majorType = field.getMajorType();
+ final MinorType minorType = majorType.getMinorType();
+
+ /**
+ * Defaults to "DRILL" as drill has as only one catalog.
+ */
+ builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
+
+ /**
+ * Designated column's schema name. Empty string if not applicable. Initial implementation defaults to empty string
+ * as we use LIMIT 0 queries to get the schema and schema info is lost. If we derive the schema from plan, we may
+ * get the right value.
+ */
+ builder.setSchemaName("");
+
+ /**
+ * Designated column's table name. Not set if not applicable. Initial implementation defaults to empty string as
+ * we use LIMIT 0 queries to get the schema and table info is lost. If we derive the table from plan, we may get
+ * the right value.
+ */
+ builder.setTableName("");
+
+ builder.setColumnName(field.getNamePart().getName());
+
+ /**
+ * Column label name for display or print purposes.
+ * Ex. a column named "empName" might be labeled as "Employee Name".
+ * Initial implementation defaults to same value as column name.
+ */
+ builder.setLabel(field.getNamePart().getName());
+
+ /**
+ * Data type in string format. Value is SQL standard type.
+ */
+ builder.setDataType(Types.getSqlTypeName(majorType));
+
+ builder.setIsNullable(majorType.getMode() == DataMode.OPTIONAL);
+
+ /**
+ * For numeric data, this is the maximum precision.
+ * For character data, this is the length in characters.
+ * For datetime datatypes, this is the length in characters of the String representation
+ * (assuming the maximum allowed precision of the fractional seconds component).
+ * For binary data, this is the length in bytes.
+ * For all other types 0 is returned where the column size is not applicable.
+ */
+ builder.setPrecision(Types.getPrecision(field.getMajorType()));
+
+ /**
+ * Column's number of digits to right of the decimal point. 0 is returned for types where the scale is not applicable
+ */
+ builder.setScale(Types.getScale(majorType));
+
+ /**
+ * Indicates whether values in the designated column are signed numbers.
+ */
+ builder.setSigned(Types.isNumericType(majorType));
+
+ /**
+ * Maximum number of characters required to display data from the column. Initial implementation hard coded to 10.
+ */
+ builder.setDisplaySize(10);
+
+ /**
+ * Is the column an aliased column. Initial implementation defaults to true as we derive schema from LIMIT 0 query and
+ * not plan
+ */
+ builder.setIsAliased(true);
+
+ builder.setSearchability(ColumnSearchability.ALL);
+ builder.setUpdatability(ColumnUpdatability.READ_ONLY);
+ builder.setAutoIncrement(false);
+ builder.setCaseSensitivity(false);
+ builder.setSortable(Types.isSortable(minorType));
+
+ /**
+ * Returns the fully-qualified name of the Java class whose instances are manufactured if the method
+ * ResultSet.getObject is called to retrieve a value from the column. Applicable only to JDBC clients.
+ */
+ builder.setClassName(DRILL_TYPE_TO_JDBC_CLASSNAME.get(minorType));
+
+ builder.setIsCurrency(false);
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index cc614d2..c1fa7a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -37,6 +38,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.metadata.MetadataProvider;
+import org.apache.drill.exec.work.prepare.PreparedStatementProvider.PreparedStatementWorker;
public class UserWorker{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
@@ -118,4 +120,9 @@ public class UserWorker{
public void submitColumnsMetadataWork(UserSession session, GetColumnsReq req, ResponseSender sender) {
bee.addNewWork(MetadataProvider.columns(session, bee.getContext(), req, sender));
}
+
+ public void submitPreparedStatementWork(final UserClientConnection connection, final CreatePreparedStatementReq req,
+ final ResponseSender sender) {
+ bee.addNewWork(new PreparedStatementWorker(connection, this, sender, req));
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 7ab73dc..3eded52 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
@@ -293,9 +294,20 @@ public class BaseTestQuery extends ExecTest {
return testRunAndReturn(QueryType.PHYSICAL, physical);
}
- public static List<QueryDataBatch> testRunAndReturn(QueryType type, String query) throws Exception{
- query = QueryTestUtil.normalizeQuery(query);
- return client.runQuery(type, query);
+ public static List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception{
+ if (type == QueryType.PREPARED_STATEMENT) {
+ Preconditions.checkArgument(query instanceof PreparedStatementHandle,
+ "Expected an instance of PreparedStatement as input query");
+ return testPreparedStatement((PreparedStatementHandle)query);
+ } else {
+ Preconditions.checkArgument(query instanceof String, "Expected a string as input query");
+ query = QueryTestUtil.normalizeQuery((String)query);
+ return client.runQuery(type, (String)query);
+ }
+ }
+
+ public static List<QueryDataBatch> testPreparedStatement(PreparedStatementHandle handle) throws Exception {
+ return client.executePreparedStatement(handle);
}
public static int testRunAndPrint(final QueryType type, final String query) throws Exception {
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 2a9c03d..9df9139 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -81,8 +81,10 @@ public class DrillTestWrapper {
// for the baseline data. In this case there needs to be a call back into the TestBuilder once we know the type information
// from the test query.
private TestBuilder testBuilder;
- // test query to run
- private String query;
+ /**
+ * Test query to run. Type of object depends on the {@link #queryType}
+ */
+ private Object query;
// The type of query provided
private UserBitShared.QueryType queryType;
// The type of query provided for the baseline
@@ -106,7 +108,7 @@ public class DrillTestWrapper {
private int expectedNumBatches;
- public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, String query, QueryType queryType,
+ public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, Object query, QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries,
QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index b073371..8acf936 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -36,6 +36,8 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.util.Text;
@@ -45,8 +47,10 @@ import com.google.common.base.Preconditions;
public class TestBuilder {
- // test query to run
- private String query;
+ /**
+ * Test query to rung. Type of object depends on the {@link #queryType}
+ */
+ private Object query;
// the type of query for the test
private UserBitShared.QueryType queryType;
// should the validation enforce ordering
@@ -85,7 +89,7 @@ public class TestBuilder {
reset();
}
- public TestBuilder(BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+ public TestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
int expectedNumBatches) {
@@ -141,6 +145,12 @@ public class TestBuilder {
return sqlQuery(String.format(query, replacements));
}
+ public TestBuilder preparedStatement(PreparedStatementHandle preparedStatementHandle) {
+ queryType = QueryType.PREPARED_STATEMENT;
+ query = preparedStatementHandle;
+ return this;
+ }
+
public TestBuilder sqlQueryFromFile(String queryFile) throws IOException {
String query = BaseTestQuery.getFile(queryFile);
this.query = query;
@@ -210,7 +220,7 @@ public class TestBuilder {
}
}
- String getValidationQuery() throws Exception {
+ Object getValidationQuery() throws Exception {
throw new RuntimeException("Must provide some kind of baseline, either a baseline file or another query");
}
@@ -354,7 +364,7 @@ public class TestBuilder {
}
// provide a SQL query to validate against
- public BaselineQueryTestBuilder sqlBaselineQuery(String baselineQuery) {
+ public BaselineQueryTestBuilder sqlBaselineQuery(Object baselineQuery) {
return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
}
@@ -403,7 +413,7 @@ public class TestBuilder {
// that come out of the test query drive interpretation of baseline
private TypeProtos.MajorType[] baselineTypes;
- CSVTestBuilder(String baselineFile, BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+ CSVTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
int expectedNumBatches) {
@@ -494,7 +504,7 @@ public class TestBuilder {
public class SchemaTestBuilder extends TestBuilder {
private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
- SchemaTestBuilder(BufferAllocator allocator, String query, UserBitShared.QueryType queryType,
+ SchemaTestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
super(allocator, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
expectsEmptyResultSet();
@@ -535,7 +545,7 @@ public class TestBuilder {
// path to the baseline file that will be inserted into the validation query
private String baselineFilePath;
- JSONTestBuilder(String baselineFile, BufferAllocator allocator, String query, UserBitShared.QueryType queryType, Boolean ordered,
+ JSONTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
int expectedNumBatches) {
@@ -559,11 +569,14 @@ public class TestBuilder {
public class BaselineQueryTestBuilder extends TestBuilder {
- private String baselineQuery;
+ /**
+ * Baseline query. Type of object depends on {@link #baselineQueryType}
+ */
+ private Object baselineQuery;
private UserBitShared.QueryType baselineQueryType;
- BaselineQueryTestBuilder(String baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
- String query, UserBitShared.QueryType queryType, Boolean ordered,
+ BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
+ Object query, UserBitShared.QueryType queryType, Boolean ordered,
boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
int expectedNumBatches) {
@@ -574,7 +587,7 @@ public class TestBuilder {
}
@Override
- String getValidationQuery() {
+ Object getValidationQuery() {
return baselineQuery;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/14f6ec7d/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
new file mode 100644
index 0000000..5a78cc9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestPreparedStatementProvider.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.prepare;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
+import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import java.sql.Date;
+import java.util.List;
+
+/**
+ * Tests for creating and executing prepared statements.
+ */
+public class TestPreparedStatementProvider extends BaseTestQuery {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPreparedStatementProvider.class);
+
+ /**
+ * Simple query.
+ * @throws Exception
+ */
+ @Test
+ public void simple() throws Exception {
+ String query = "SELECT * FROM cp.`region.json` ORDER BY region_id LIMIT 1";
+ PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+ List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+ new ExpectedColumnResult("region_id", "BIGINT", true, 0, 0, true, Long.class.getName()),
+ new ExpectedColumnResult("sales_city", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("sales_state_province", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("sales_district", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("sales_region", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("sales_country", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("sales_district_id", "BIGINT", true, 0, 0, true, Long.class.getName())
+ );
+
+ verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+
+ testBuilder()
+ .unOrdered()
+ .preparedStatement(preparedStatement.getServerHandle())
+ .baselineColumns("region_id", "sales_city", "sales_state_province", "sales_district",
+ "sales_region", "sales_country", "sales_district_id")
+ .baselineValues(0L, "None", "None", "No District", "No Region", "No Country", 0L)
+ .go();
+ }
+
+ /**
+ * Create a prepared statement for a query that has GROUP BY clause in it
+ */
+ @Test
+ public void groupByQuery() throws Exception {
+ String query = "SELECT sales_city, count(*) as cnt FROM cp.`region.json` " +
+ "GROUP BY sales_city ORDER BY sales_city DESC LIMIT 1";
+ PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+ List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+ new ExpectedColumnResult("sales_city", "CHARACTER VARYING", true, 65536, 0, false, String.class.getName()),
+ new ExpectedColumnResult("cnt", "BIGINT", false, 0, 0, true, Long.class.getName())
+ );
+
+ verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+
+ testBuilder()
+ .unOrdered()
+ .preparedStatement(preparedStatement.getServerHandle())
+ .baselineColumns("sales_city", "cnt")
+ .baselineValues("Yakima", 1L)
+ .go();
+ }
+
+ /**
+ * Create a prepared statement for a query that joins two tables and has ORDER BY clause.
+ */
+ @Test
+ public void joinOrderByQuery() throws Exception {
+ String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey FROM cp.`tpch/lineitem.parquet` l JOIN cp.`tpch/orders.parquet` o " +
+ "ON l.l_orderkey = o.o_orderkey LIMIT 2";
+
+ PreparedStatement preparedStatement = createPrepareStmt(query, false, null);
+
+ List<ExpectedColumnResult> expMetadata = ImmutableList.of(
+ new ExpectedColumnResult("l_quantity", "DOUBLE", false, 0, 0, true, Double.class.getName()),
+ new ExpectedColumnResult("l_shipdate", "DATE", false, 0, 0, false, Date.class.getName()),
+ new ExpectedColumnResult("o_custkey", "INTEGER", false, 0, 0, true, Integer.class.getName())
+ );
+
+ verifyMetadata(expMetadata, preparedStatement.getColumnsList());
+ }
+
+ /**
+ * Pass an invalid query to the create prepare statement request and expect a parser failure.
+ * @throws Exception
+ */
+ @Test
+ public void invalidQueryParserError() throws Exception {
+ createPrepareStmt("BLAH BLAH", true, ErrorType.PARSE);
+ }
+
+ /**
+ * Pass an invalid query to the create prepare statement request and expect a validation failure.
+ * @throws Exception
+ */
+ @Test
+ public void invalidQueryValidationError() throws Exception {
+ createPrepareStmt("SELECT * sdflkgdh", true, ErrorType.PARSE /** Drill returns incorrect error for parse error*/);
+ }
+
+ /* Helper method which creates a prepared statement for given query. */
+ private static PreparedStatement createPrepareStmt(String query, boolean expectFailure, ErrorType errorType) throws Exception {
+ CreatePreparedStatementResp resp = client.createPreparedStatement(query).get();
+
+ assertEquals(expectFailure ? RequestStatus.FAILED : RequestStatus.OK, resp.getStatus());
+
+ if (expectFailure) {
+ assertEquals(errorType, resp.getError().getErrorType());
+ } else {
+ logger.error("Prepared statement creation failed: {}", resp.getError().getMessage());
+ }
+
+ return resp.getPreparedStatement();
+ }
+
+ private static class ExpectedColumnResult {
+ final String columnName;
+ final String type;
+ final boolean nullable;
+ final int precision;
+ final int scale;
+ final boolean signed;
+ final String className;
+
+ ExpectedColumnResult(String columnName, String type, boolean nullable, int precision, int scale, boolean signed,
+ String className) {
+ this.columnName = columnName;
+ this.type = type;
+ this.nullable = nullable;
+ this.precision = precision;
+ this.scale = scale;
+ this.signed = signed;
+ this.className = className;
+ }
+
+ boolean isEqualsTo(ResultColumnMetadata result) {
+ return
+ result.getCatalogName().equals(InfoSchemaConstants.IS_CATALOG_NAME) &&
+ result.getSchemaName().isEmpty() &&
+ result.getTableName().isEmpty() &&
+ result.getColumnName().equals(columnName) &&
+ result.getLabel().equals(columnName) &&
+ result.getDataType().equals(type) &&
+ result.getIsNullable() == nullable &&
+ result.getPrecision() == precision &&
+ result.getScale() == scale &&
+ result.getSigned() == signed &&
+ result.getDisplaySize() == 10 &&
+ result.getClassName().equals(className) &&
+ result.getSearchability() == ColumnSearchability.ALL &&
+ result.getAutoIncrement() == false &&
+ result.getCaseSensitivity() == false &&
+ result.getUpdatability() == ColumnUpdatability.READ_ONLY &&
+ result.getIsAliased() == true &&
+ result.getIsCurrency() == false;
+ }
+
+ @Override
+ public String toString() {
+ return "ExpectedColumnResult[" +
+ "columnName='" + columnName + '\'' +
+ ", type='" + type + '\'' +
+ ", nullable=" + nullable +
+ ", precision=" + precision +
+ ", scale=" + scale +
+ ", signed=" + signed +
+ ", className='" + className + '\'' +
+ ']';
+ }
+ }
+
+ private static void verifyMetadata(List<ExpectedColumnResult> expMetadata,
+ List<ResultColumnMetadata> actMetadata) {
+ assertEquals(expMetadata.size(), actMetadata.size());
+
+ for(ExpectedColumnResult exp : expMetadata) {
+ boolean found = false;
+ for(ResultColumnMetadata act : actMetadata) {
+ found = exp.isEqualsTo(act);
+ if (found) {
+ break;
+ }
+ }
+ assertTrue("Failed to find the expected column metadata: " + exp, found);
+ }
+ }
+}