You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/08/07 19:51:14 UTC
[5/5] drill git commit: DRILL-4728: Add support for new metadata
fetch APIs
DRILL-4728: Add support for new metadata fetch APIs
+ Protobuf messages
- GetCatalogsReq -> GetCatalogsResp
- GetSchemasReq -> GetSchemasResp
- GetTablesReq -> GetTablesResp
- GetColumnsReq -> GetColumnsResp
+ Java Drill client changes
+ Server side changes to handle the metadata API calls
- Provide a self contained `Runnable` implementation for each metadata API
that process the requests and sends the response to client
- In `UserWorker` override the `handle` method that takes the `ResponseSender` and
send the response from the `handle` method instead of returning it.
- Add a method for each new API to UserWorker to submit the metadata work.
- Add a method `addNewWork(Runnable runnable)` to `WorkerBee` to submit a generic
`Runnable` to `ExecutorService`.
- Move out couple of methods from `QueryContext` into a separate interface
`SchemaConfigInfoProvider` to enable instantiating Schema trees without the
full `QueryContext`
+ New protobuf messages increased the `jdbc-all.jar` size. Up the limit to 21MB.
this closes #527
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ef6e522c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ef6e522c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ef6e522c
Branch: refs/heads/master
Commit: ef6e522c9cba816110aa43ff6bccedf29a901236
Parents: 4bd67a6
Author: vkorukanti <ve...@dremio.com>
Authored: Thu Jun 9 16:03:06 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Sun Aug 7 12:47:00 2016 -0700
----------------------------------------------------------------------
.../drill/common/exceptions/ErrorHelper.java | 4 +-
.../apache/drill/exec/client/DrillClient.java | 101 +
.../org/apache/drill/exec/ops/QueryContext.java | 57 +-
.../apache/drill/exec/rpc/user/UserClient.java | 16 +-
.../drill/exec/rpc/user/UserRpcConfig.java | 12 +
.../apache/drill/exec/rpc/user/UserServer.java | 54 +-
.../apache/drill/exec/store/SchemaConfig.java | 50 +-
.../drill/exec/store/SchemaTreeProvider.java | 106 +
.../drill/exec/store/pojo/PojoRecordReader.java | 28 +-
.../org/apache/drill/exec/work/WorkManager.java | 8 +
.../exec/work/metadata/MetadataProvider.java | 486 +
.../apache/drill/exec/work/user/UserWorker.java | 25 +-
.../work/metadata/TestMetadataProvider.java | 308 +
exec/jdbc-all/pom.xml | 2 +-
.../drill/exec/proto/SchemaUserProtos.java | 1752 +++
.../org/apache/drill/exec/proto/UserProtos.java | 13537 ++++++++++++++++-
.../drill/exec/proto/beans/CatalogMetadata.java | 207 +
.../drill/exec/proto/beans/ColumnMetadata.java | 493 +
.../drill/exec/proto/beans/GetCatalogsReq.java | 165 +
.../drill/exec/proto/beans/GetCatalogsResp.java | 221 +
.../drill/exec/proto/beans/GetColumnsReq.java | 237 +
.../drill/exec/proto/beans/GetColumnsResp.java | 221 +
.../drill/exec/proto/beans/GetSchemasReq.java | 189 +
.../drill/exec/proto/beans/GetSchemasResp.java | 221 +
.../drill/exec/proto/beans/GetTablesReq.java | 213 +
.../drill/exec/proto/beans/GetTablesResp.java | 221 +
.../drill/exec/proto/beans/LikeFilter.java | 185 +
.../drill/exec/proto/beans/RequestStatus.java | 51 +
.../apache/drill/exec/proto/beans/RpcType.java | 16 +
.../drill/exec/proto/beans/SchemaMetadata.java | 251 +
.../drill/exec/proto/beans/TableMetadata.java | 229 +
protocol/src/main/protobuf/User.proto | 149 +-
32 files changed, 19635 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
index 0aa5a1b..9b2097d 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
@@ -25,7 +25,7 @@ import java.util.regex.Pattern;
/**
* Utility class that handles error message generation from protobuf error objects.
*/
-class ErrorHelper {
+public class ErrorHelper {
private final static Pattern IGNORE= Pattern.compile("^(sun|com\\.sun|java).*");
@@ -96,7 +96,7 @@ class ErrorHelper {
return sb.toString();
}
- static ExceptionWrapper getWrapper(Throwable ex) {
+ public static ExceptionWrapper getWrapper(Throwable ex) {
return getWrapperBuilder(ex).build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 11abbcc..8063778 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -53,7 +53,16 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -411,6 +420,98 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
/**
+ * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+ *
+ * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+ * @return
+ */
+ public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) {
+ final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder();
+ if (catalogNameFilter != null) {
+ reqBuilder.setCatalogNameFilter(catalogNameFilter);
+ }
+
+ return client.send(RpcType.GET_CATALOGS, reqBuilder.build(), GetCatalogsResp.class);
+ }
+
+ /**
+ * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+ *
+ * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+ * @return
+ */
+ public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) {
+ final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder();
+ if (catalogNameFilter != null) {
+ reqBuilder.setCatalogNameFilter(catalogNameFilter);
+ }
+
+ if (schemaNameFilter != null) {
+ reqBuilder.setSchameNameFilter(schemaNameFilter);
+ }
+
+ return client.send(RpcType.GET_SCHEMAS, reqBuilder.build(), GetSchemasResp.class);
+ }
+
+ /**
+ * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+ *
+ * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+ * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
+ * @return
+ */
+ public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
+ LikeFilter tableNameFilter) {
+ final GetTablesReq.Builder reqBuilder = GetTablesReq.newBuilder();
+ if (catalogNameFilter != null) {
+ reqBuilder.setCatalogNameFilter(catalogNameFilter);
+ }
+
+ if (schemaNameFilter != null) {
+ reqBuilder.setSchameNameFilter(schemaNameFilter);
+ }
+
+ if (tableNameFilter != null) {
+ reqBuilder.setTableNameFilter(tableNameFilter);
+ }
+
+ return client.send(RpcType.GET_TABLES, reqBuilder.build(), GetTablesResp.class);
+ }
+
+ /**
+ * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+ *
+ * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+ * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+ * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
+ * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter.
+ * @return
+ */
+ public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
+ LikeFilter tableNameFilter, LikeFilter columnNameFilter) {
+ final GetColumnsReq.Builder reqBuilder = GetColumnsReq.newBuilder();
+ if (catalogNameFilter != null) {
+ reqBuilder.setCatalogNameFilter(catalogNameFilter);
+ }
+
+ if (schemaNameFilter != null) {
+ reqBuilder.setSchameNameFilter(schemaNameFilter);
+ }
+
+ if (tableNameFilter != null) {
+ reqBuilder.setTableNameFilter(tableNameFilter);
+ }
+
+ if (columnNameFilter != null) {
+ reqBuilder.setColumnNameFilter(columnNameFilter);
+ }
+
+ return client.send(RpcType.GET_COLUMNS, reqBuilder.build(), GetColumnsResp.class);
+ }
+
+ /**
* Submits a Logical plan for direct execution (bypasses parsing)
*
* @param plan the plan to execute
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 3ce0633..44e33cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,16 +19,13 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
-import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -41,21 +38,22 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.QueryOptionManager;
-import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.store.PartitionExplorerImpl;
import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.store.SchemaTreeProvider;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.util.Utilities;
import com.google.common.collect.Lists;
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
-public class QueryContext implements AutoCloseable, OptimizerRulesContext {
+public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private final DrillbitContext drillbitContext;
@@ -70,8 +68,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
private final ContextInformation contextInformation;
private final QueryContextInformation queryContextInfo;
private final ViewExpansionContext viewExpansionContext;
-
- private final List<SchemaPlus> schemaTreesToClose;
+ private final SchemaTreeProvider schemaTreeProvider;
/*
* Flag to indicate if close has been called, after calling close the first
@@ -97,7 +94,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
plannerSettings.getPlanningMemoryLimit());
bufferManager = new BufferManagerImpl(this.allocator);
viewExpansionContext = new ViewExpansionContext(this);
- schemaTreesToClose = Lists.newArrayList();
+ schemaTreeProvider = new SchemaTreeProvider(drillbitContext);
}
@Override
@@ -146,9 +143,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
* @return Root of the schema tree.
*/
public SchemaPlus getRootSchema(final String userName) {
- final String schemaUser = isImpersonationEnabled() ? userName : ImpersonationUtil.getProcessUserName();
- final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, this).build();
- return getRootSchema(schemaConfig);
+ return schemaTreeProvider.createRootSchema(userName, this);
}
/**
@@ -157,17 +152,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
* @return
*/
public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
- try {
- final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false);
- drillbitContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
- schemaTreesToClose.add(rootSchema);
- return rootSchema;
- } catch(IOException e) {
- // We can't proceed further without a schema, throw a runtime exception.
- final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
- logger.error(errMsg, e);
- throw new DrillRuntimeException(errMsg, e);
- }
+ return schemaTreeProvider.createRootSchema(schemaConfig);
}
/**
@@ -211,10 +196,16 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
return drillbitContext.getFunctionImplementationRegistry();
}
+ @Override
public ViewExpansionContext getViewExpansionContext() {
return viewExpansionContext;
}
+ @Override
+ public OptionValue getOption(String optionKey) {
+ return getOptions().getOption(optionKey);
+ }
+
public boolean isImpersonationEnabled() {
return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
}
@@ -256,28 +247,12 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
// allocator from the toClose list.
toClose.add(bufferManager);
toClose.add(allocator);
+ toClose.add(schemaTreeProvider);
- for(SchemaPlus tree : schemaTreesToClose) {
- addSchemasToCloseList(tree, toClose);
- }
-
- AutoCloseables.close(toClose.toArray(new AutoCloseable[0]));
+ AutoCloseables.close(toClose);
}
} finally {
closed = true;
}
}
-
- private void addSchemasToCloseList(final SchemaPlus tree, final List<AutoCloseable> toClose) {
- for(String subSchemaName : tree.getSubSchemaNames()) {
- addSchemasToCloseList(tree.getSubSchema(subSchemaName), toClose);
- }
-
- try {
- AbstractSchema drillSchemaImpl = tree.unwrap(AbstractSchema.class);
- toClose.add(drillSchemaImpl);
- } catch (ClassCastException e) {
- // Ignore as the SchemaPlus is not an implementation of Drill schema.
- }
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 86abaca..5106787 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -31,7 +31,11 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -99,12 +103,20 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
return BitToUserHandshake.getDefaultInstance();
case RpcType.QUERY_HANDLE_VALUE:
return QueryId.getDefaultInstance();
- case RpcType.QUERY_RESULT_VALUE:
- return QueryResult.getDefaultInstance();
+ case RpcType.QUERY_RESULT_VALUE:
+ return QueryResult.getDefaultInstance();
case RpcType.QUERY_DATA_VALUE:
return QueryData.getDefaultInstance();
case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
return QueryPlanFragments.getDefaultInstance();
+ case RpcType.CATALOGS_VALUE:
+ return GetCatalogsResp.getDefaultInstance();
+ case RpcType.SCHEMAS_VALUE:
+ return GetSchemasResp.getDefaultInstance();
+ case RpcType.TABLES_VALUE:
+ return GetTablesResp.getDefaultInstance();
+ case RpcType.COLUMNS_VALUE:
+ return GetColumnsResp.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index f0cbb22..809ac65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -26,7 +26,15 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -49,6 +57,10 @@ public class UserRpcConfig {
.add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.add(RpcType.GET_QUERY_PLAN_FRAGMENTS, GetQueryPlanFragments.class,
RpcType.QUERY_PLAN_FRAGMENTS, QueryPlanFragments.class) // user to bit
+ .add(RpcType.GET_CATALOGS, GetCatalogsReq.class, RpcType.CATALOGS, GetCatalogsResp.class) // user to bit
+ .add(RpcType.GET_SCHEMAS, GetSchemasReq.class, RpcType.SCHEMAS, GetSchemasResp.class) // user to bit
+ .add(RpcType.GET_TABLES, GetTablesReq.class, RpcType.TABLES, GetTablesResp.class) // user to bit
+ .add(RpcType.GET_COLUMNS, GetColumnsReq.class, RpcType.COLUMNS, GetColumnsResp.class) // user to bit
.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 09bc5c8..adf7ec4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -38,7 +38,11 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,6 +55,7 @@ import org.apache.drill.exec.rpc.OutboundRpcMessage;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
@@ -101,8 +106,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
- throws RpcException {
+ protected void handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender responseSender) throws RpcException {
switch (rpcType) {
case RpcType.RUN_QUERY_VALUE:
@@ -110,7 +115,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
try {
final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
final QueryId queryId = worker.submitWork(connection, query);
- return new Response(RpcType.QUERY_HANDLE, queryId);
+ responseSender.send(new Response(RpcType.QUERY_HANDLE, queryId));
+ break;
} catch (InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding RunQuery body.", e);
}
@@ -119,7 +125,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
try {
final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
final Ack ack = worker.cancelQuery(queryId);
- return new Response(RpcType.ACK, ack);
+ responseSender.send(new Response(RpcType.ACK, ack));
+ break;
} catch (InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding QueryId body.", e);
}
@@ -128,21 +135,54 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
try {
final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
final Ack ack = worker.resumeQuery(queryId);
- return new Response(RpcType.ACK, ack);
+ responseSender.send(new Response(RpcType.ACK, ack));
+ break;
} catch (final InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding QueryId body.", e);
}
case RpcType.GET_QUERY_PLAN_FRAGMENTS_VALUE:
try {
final GetQueryPlanFragments req = GetQueryPlanFragments.PARSER.parseFrom(new ByteBufInputStream(pBody));
- return new Response(RpcType.QUERY_PLAN_FRAGMENTS, worker.getQueryPlan(connection, req));
+ responseSender.send(new Response(RpcType.QUERY_PLAN_FRAGMENTS, worker.getQueryPlan(connection, req)));
+ break;
} catch(final InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding GetQueryPlanFragments body.", e);
}
+ case RpcType.GET_CATALOGS_VALUE:
+ try {
+ final GetCatalogsReq req = GetCatalogsReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ worker.submitCatalogMetadataWork(connection.getSession(), req, responseSender);
+ break;
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding GetCatalogsReq body.", e);
+ }
+ case RpcType.GET_SCHEMAS_VALUE:
+ try {
+ final GetSchemasReq req = GetSchemasReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ worker.submitSchemasMetadataWork(connection.getSession(), req, responseSender);
+ break;
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding GetSchemasReq body.", e);
+ }
+ case RpcType.GET_TABLES_VALUE:
+ try {
+ final GetTablesReq req = GetTablesReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ worker.submitTablesMetadataWork(connection.getSession(), req, responseSender);
+ break;
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding GetTablesReq body.", e);
+ }
+ case RpcType.GET_COLUMNS_VALUE:
+ try {
+ final GetColumnsReq req = GetColumnsReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ worker.submitColumnsMetadataWork(connection.getSession(), req, responseSender);
+ break;
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding GetColumnsReq body.", e);
+ }
default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
-
}
public class UserClientConnection extends RemoteConnection {
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
index 0297945..3e8f1c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -19,7 +19,8 @@ package org.apache.drill.exec.store;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import org.apache.drill.exec.ops.QueryContext;
+
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.exec.ops.ViewExpansionContext;
import org.apache.drill.exec.server.options.OptionValue;
@@ -28,29 +29,35 @@ import org.apache.drill.exec.server.options.OptionValue;
*/
public class SchemaConfig {
private final String userName;
- private final QueryContext queryContext;
+ private final SchemaConfigInfoProvider provider;
private final boolean ignoreAuthErrors;
- private SchemaConfig(final String userName, final QueryContext queryContext, final boolean ignoreAuthErrors) {
+ private SchemaConfig(final String userName, final SchemaConfigInfoProvider provider, final boolean ignoreAuthErrors) {
this.userName = userName;
- this.queryContext = queryContext;
+ this.provider = provider;
this.ignoreAuthErrors = ignoreAuthErrors;
}
- public static Builder newBuilder(final String userName, final QueryContext queryContext) {
+ /**
+ * Create new builder.
+ * @param userName Name of the user accessing the storage sources.
+ * @param provider Implementation {@link SchemaConfigInfoProvider}
+ * @return
+ */
+ public static Builder newBuilder(final String userName, final SchemaConfigInfoProvider provider) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(userName), "A valid userName is expected");
- Preconditions.checkNotNull(queryContext, "Non-null QueryContext is expected");
- return new Builder(userName, queryContext);
+ Preconditions.checkNotNull(provider, "Non-null SchemaConfigInfoProvider is expected");
+ return new Builder(userName, provider);
}
public static class Builder {
final String userName;
- final QueryContext queryContext;
+ final SchemaConfigInfoProvider provider;
boolean ignoreAuthErrors;
- private Builder(final String userName, final QueryContext queryContext) {
+ private Builder(final String userName, final SchemaConfigInfoProvider provider) {
this.userName = userName;
- this.queryContext = queryContext;
+ this.provider = provider;
}
public Builder setIgnoreAuthErrors(boolean ignoreAuthErrors) {
@@ -59,16 +66,12 @@ public class SchemaConfig {
}
public SchemaConfig build() {
- return new SchemaConfig(userName, queryContext, ignoreAuthErrors);
+ return new SchemaConfig(userName, provider, ignoreAuthErrors);
}
}
- public QueryContext getQueryContext() {
- return queryContext;
- }
-
/**
- * @return User whom to impersonate as while {@link net.hydromatic.optiq.SchemaPlus} instances
+ * @return User whom to impersonate as while creating {@link SchemaPlus} instances
* interact with the underlying storage.
*/
public String getUserName() {
@@ -76,7 +79,7 @@ public class SchemaConfig {
}
/**
- * @return Should ignore if authorization errors are reported while {@link net.hydromatic.optiq.SchemaPlus}
+ * @return Should ignore if authorization errors are reported while {@link SchemaPlus}
* instances interact with the underlying storage.
*/
public boolean getIgnoreAuthErrors() {
@@ -84,10 +87,19 @@ public class SchemaConfig {
}
public OptionValue getOption(String optionKey) {
- return queryContext.getOptions().getOption(optionKey);
+ return provider.getOption(optionKey);
}
public ViewExpansionContext getViewExpansionContext() {
- return queryContext.getViewExpansionContext();
+ return provider.getViewExpansionContext();
+ }
+
+ /**
+ * Interface to implement to provide required info for {@link org.apache.drill.exec.store.SchemaConfig}
+ */
+ public interface SchemaConfigInfoProvider {
+ ViewExpansionContext getViewExpansionContext();
+
+ OptionValue getOption(String optionKey);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
new file mode 100644
index 0000000..d05cc43
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.jdbc.SimpleCalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class which creates new schema trees. It keeps track of newly created schema trees and closes them safely as
+ * part of {@link #close()}.
+ */
+public class SchemaTreeProvider implements AutoCloseable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaTreeProvider.class);
+
+ private final DrillbitContext dContext;
+ private final List<SchemaPlus> schemaTreesToClose;
+ private final boolean isImpersonationEnabled;
+
+ public SchemaTreeProvider(final DrillbitContext dContext) {
+ this.dContext = dContext;
+ schemaTreesToClose = Lists.newArrayList();
+ isImpersonationEnabled = dContext.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+ }
+
+ /**
+ * Return root schema with schema owner as the given user.
+ *
+ * @param userName Name of the user who is accessing the storage sources.
+ * @param provider {@link SchemaConfigInfoProvider} instance
+ * @return Root of the schema tree.
+ */
+ public SchemaPlus createRootSchema(final String userName, final SchemaConfigInfoProvider provider) {
+ final String schemaUser = isImpersonationEnabled ? userName : ImpersonationUtil.getProcessUserName();
+ final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, provider).build();
+ return createRootSchema(schemaConfig);
+ }
+
+ /**
+ * Create and return a SchemaTree with given <i>schemaConfig</i>.
+ * @param schemaConfig
+ * @return
+ */
+ public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
+ try {
+ final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false);
+ dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
+ schemaTreesToClose.add(rootSchema);
+ return rootSchema;
+ } catch(IOException e) {
+ // We can't proceed further without a schema, throw a runtime exception.
+ throw UserException
+ .resourceError(e)
+ .message("Failed to create schema tree.")
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ List<AutoCloseable> toClose = Lists.newArrayList();
+ for(SchemaPlus tree : schemaTreesToClose) {
+ addSchemasToCloseList(tree, toClose);
+ }
+
+ AutoCloseables.close(toClose);
+ }
+
+ private static void addSchemasToCloseList(final SchemaPlus tree, final List<AutoCloseable> toClose) {
+ for(String subSchemaName : tree.getSubSchemaNames()) {
+ addSchemasToCloseList(tree.getSubSchema(subSchemaName), toClose);
+ }
+
+ try {
+ AbstractSchema drillSchemaImpl = tree.unwrap(AbstractSchema.class);
+ toClose.add(drillSchemaImpl);
+ } catch (ClassCastException e) {
+ // Ignore as the SchemaPlus is not an implementation of Drill schema.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 7feb303..baf07a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.pojo.Writers.BitWriter;
import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
@@ -47,24 +46,30 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-public class PojoRecordReader<T> extends AbstractRecordReader {
+public class PojoRecordReader<T> extends AbstractRecordReader implements Iterable<T> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PojoRecordReader.class);
- public final int forJsonIgnore = 1;
-
private final Class<T> pojoClass;
- private final Iterator<T> iterator;
+ private final List<T> pojoObjects;
private PojoWriter[] writers;
private boolean doCurrent;
private T currentPojo;
private OperatorContext operatorContext;
+ private Iterator<T> currentIterator;
+
+ /**
+ * TODO: Cleanup the callers to pass the List of POJO objects directly rather than iterator.
+ * @param pojoClass
+ * @param iterator
+ */
public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator) {
this.pojoClass = pojoClass;
- this.iterator = iterator;
+ this.pojoObjects = ImmutableList.copyOf(iterator);
}
@Override
@@ -118,7 +123,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e);
}
-
+ currentIterator = pojoObjects.iterator();
}
@Override
@@ -146,11 +151,11 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
try {
int i =0;
- while (doCurrent || iterator.hasNext()) {
+ while (doCurrent || currentIterator.hasNext()) {
if (doCurrent) {
doCurrent = false;
} else {
- currentPojo = iterator.next();
+ currentPojo = currentIterator.next();
}
if (!allocated) {
@@ -174,6 +179,11 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
}
@Override
+ public Iterator<T> iterator() {
+ return pojoObjects.iterator();
+ }
+
+ @Override
public void close() {
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e910150..ee11592 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -203,6 +203,14 @@ public class WorkManager implements AutoCloseable {
}
/**
+ * Add a self contained runnable work to executor service.
+ * @param runnable
+ */
+ public void addNewWork(final Runnable runnable) {
+ executor.execute(runnable);
+ }
+
+ /**
* Remove the given Foreman from the running query list.
*
* <p>The running query list is a bit of a misnomer, because it doesn't
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
new file mode 100644
index 0000000..aca54b3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.metadata;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.CATALOGS;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.COLUMNS;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.SCHEMATA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.TABLES;
+
+import java.util.UUID;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.ErrorHelper;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
+import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.SchemaMetadata;
+import org.apache.drill.exec.proto.UserProtos.TableMetadata;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.store.SchemaTreeProvider;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ConstantExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FieldExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FunctionExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
+import org.apache.drill.exec.store.ischema.Records.Catalog;
+import org.apache.drill.exec.store.ischema.Records.Column;
+import org.apache.drill.exec.store.ischema.Records.Schema;
+import org.apache.drill.exec.store.ischema.Records.Table;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains worker {@link Runnable} classes for providing the metadata and related helper methods.
+ */
+public class MetadataProvider {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetadataProvider.class);
+
+ private static final String LIKE_FUNCTION = "like";
+ private static final String AND_FUNCTION = "booleanand";
+ private static final String OR_FUNCTION = "booleanor";
+
+ /**
+ * @return Runnable that fetches the catalog metadata for given {@link GetCatalogsReq} and sends response at the end.
+ */
+ public static Runnable catalogs(final UserSession session, final DrillbitContext dContext,
+ final GetCatalogsReq req, final ResponseSender responseSender) {
+ return new CatalogsProvider(session, dContext, req, responseSender);
+ }
+
+ /**
+ * @return Runnable that fetches the schema metadata for given {@link GetSchemasReq} and sends response at the end.
+ */
+ public static Runnable schemas(final UserSession session, final DrillbitContext dContext,
+ final GetSchemasReq req, final ResponseSender responseSender) {
+ return new SchemasProvider(session, dContext, req, responseSender);
+ }
+
+ /**
+ * @return Runnable that fetches the table metadata for given {@link GetTablesReq} and sends response at the end.
+ */
+ public static Runnable tables(final UserSession session, final DrillbitContext dContext,
+ final GetTablesReq req, final ResponseSender responseSender) {
+ return new TablesProvider(session, dContext, req, responseSender);
+ }
+
+ /**
+ * @return Runnable that fetches the column metadata for given {@link GetColumnsReq} and sends response at the end.
+ */
+ public static Runnable columns(final UserSession session, final DrillbitContext dContext,
+ final GetColumnsReq req, final ResponseSender responseSender) {
+ return new ColumnsProvider(session, dContext, req, responseSender);
+ }
+
+ /**
+ * Super class for all metadata provider runnable classes.
+ */
+ private abstract static class MetadataRunnable implements Runnable {
+ protected final UserSession session;
+ private final ResponseSender responseSender;
+ private final DrillbitContext dContext;
+
+ private MetadataRunnable(final UserSession session, final DrillbitContext dContext,
+ final ResponseSender responseSender) {
+ this.session = Preconditions.checkNotNull(session);
+ this.dContext = Preconditions.checkNotNull(dContext);
+ this.responseSender = Preconditions.checkNotNull(responseSender);
+ }
+
+ @Override
+ public void run() {
+ try(SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(dContext)) {
+ responseSender.send(runInternal(session, schemaTreeProvider));
+ } catch (final Throwable error) {
+ logger.error("Unhandled metadata provider error", error);
+ }
+ }
+
+ /**
+ * @return A {@link Response} message. Response must be returned in any case.
+ */
+ protected abstract Response runInternal(UserSession session, SchemaTreeProvider schemaProvider);
+ }
+
+ /**
+ * Runnable that fetches the catalog metadata for given {@link GetCatalogsReq} and sends response at the end.
+ */
+ private static class CatalogsProvider extends MetadataRunnable {
+ private final GetCatalogsReq req;
+
+ public CatalogsProvider(final UserSession session, final DrillbitContext dContext,
+ final GetCatalogsReq req, final ResponseSender responseSender) {
+ super(session, dContext, responseSender);
+ this.req = Preconditions.checkNotNull(req);
+ }
+
+ @Override
+ protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+ final GetCatalogsResp.Builder respBuilder = GetCatalogsResp.newBuilder();
+
+ final InfoSchemaFilter filter = createInfoSchemaFilter(
+ req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null, null, null, null);
+
+ try {
+ final PojoRecordReader<Catalog> records =
+ (PojoRecordReader<Catalog>) getPojoRecordReader(CATALOGS, filter, schemaProvider, session);
+
+ for(Catalog c : records) {
+ final CatalogMetadata.Builder catBuilder = CatalogMetadata.newBuilder();
+ catBuilder.setCatalogName(c.CATALOG_NAME);
+ catBuilder.setDescription(c.CATALOG_DESCRIPTION);
+ catBuilder.setConnect(c.CATALOG_CONNECT);
+
+ respBuilder.addCatalogs(catBuilder.build());
+ }
+
+ respBuilder.setStatus(RequestStatus.OK);
+ } catch (Throwable e) {
+ respBuilder.setStatus(RequestStatus.FAILED);
+ respBuilder.setError(createPBError("get catalogs", e));
+ } finally {
+ return new Response(RpcType.CATALOGS, respBuilder.build());
+ }
+ }
+ }
+
+ private static class SchemasProvider extends MetadataRunnable {
+ private final GetSchemasReq req;
+
+ private SchemasProvider(final UserSession session, final DrillbitContext dContext,
+ final GetSchemasReq req, final ResponseSender responseSender) {
+ super(session, dContext, responseSender);
+ this.req = Preconditions.checkNotNull(req);
+ }
+
+ @Override
+ protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+ final GetSchemasResp.Builder respBuilder = GetSchemasResp.newBuilder();
+
+ final InfoSchemaFilter filter = createInfoSchemaFilter(
+ req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+ req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+ null, null);
+
+ try {
+ final PojoRecordReader<Schema> records = (PojoRecordReader<Schema>)
+ getPojoRecordReader(SCHEMATA, filter, schemaProvider, session);
+
+ for(Schema s : records) {
+ final SchemaMetadata.Builder schemaBuilder = SchemaMetadata.newBuilder();
+ schemaBuilder.setCatalogName(s.CATALOG_NAME);
+ schemaBuilder.setSchemaName(s.SCHEMA_NAME);
+ schemaBuilder.setOwner(s.SCHEMA_OWNER);
+ schemaBuilder.setType(s.TYPE);
+ schemaBuilder.setMutable(s.IS_MUTABLE);
+
+ respBuilder.addSchemas(schemaBuilder.build());
+ }
+
+ respBuilder.setStatus(RequestStatus.OK);
+ } catch (Throwable e) {
+ respBuilder.setStatus(RequestStatus.FAILED);
+ respBuilder.setError(createPBError("get schemas", e));
+ } finally {
+ return new Response(RpcType.SCHEMAS, respBuilder.build());
+ }
+ }
+ }
+
+ private static class TablesProvider extends MetadataRunnable {
+ private final GetTablesReq req;
+
+ private TablesProvider(final UserSession session, final DrillbitContext dContext,
+ final GetTablesReq req, final ResponseSender responseSender) {
+ super(session, dContext, responseSender);
+ this.req = Preconditions.checkNotNull(req);
+ }
+
+ @Override
+ protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+ final GetTablesResp.Builder respBuilder = GetTablesResp.newBuilder();
+
+ final InfoSchemaFilter filter = createInfoSchemaFilter(
+ req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+ req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+ req.hasTableNameFilter() ? req.getTableNameFilter() : null,
+ null);
+
+ try {
+ final PojoRecordReader<Table> records =
+ (PojoRecordReader<Table>)getPojoRecordReader(TABLES, filter, schemaProvider, session);
+
+ for(Table t : records) {
+ final TableMetadata.Builder tableBuilder = TableMetadata.newBuilder();
+ tableBuilder.setCatalogName(t.TABLE_CATALOG);
+ tableBuilder.setSchemaName(t.TABLE_SCHEMA);
+ tableBuilder.setTableName(t.TABLE_NAME);
+ tableBuilder.setType(t.TABLE_TYPE);
+
+ respBuilder.addTables(tableBuilder.build());
+ }
+
+ respBuilder.setStatus(RequestStatus.OK);
+ } catch (Throwable e) {
+ respBuilder.setStatus(RequestStatus.FAILED);
+ respBuilder.setError(createPBError("get tables", e));
+ } finally {
+ return new Response(RpcType.TABLES, respBuilder.build());
+ }
+ }
+ }
+
+ private static class ColumnsProvider extends MetadataRunnable {
+ private final GetColumnsReq req;
+
+ private ColumnsProvider(final UserSession session, final DrillbitContext dContext,
+ final GetColumnsReq req, final ResponseSender responseSender) {
+ super(session, dContext, responseSender);
+ this.req = Preconditions.checkNotNull(req);
+ }
+
+ @Override
+ protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+ final GetColumnsResp.Builder respBuilder = GetColumnsResp.newBuilder();
+
+ final InfoSchemaFilter filter = createInfoSchemaFilter(
+ req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+ req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+ req.hasTableNameFilter() ? req.getTableNameFilter() : null,
+ req.hasColumnNameFilter() ? req.getColumnNameFilter() : null
+ );
+
+ try {
+ final PojoRecordReader<Column> records =
+ (PojoRecordReader<Column>)getPojoRecordReader(COLUMNS, filter, schemaProvider, session);
+
+ for(Column c : records) {
+ final ColumnMetadata.Builder columnBuilder = ColumnMetadata.newBuilder();
+ columnBuilder.setCatalogName(c.TABLE_CATALOG);
+ columnBuilder.setSchemaName(c.TABLE_SCHEMA);
+ columnBuilder.setTableName(c.TABLE_NAME);
+ columnBuilder.setColumnName(c.COLUMN_NAME);
+ columnBuilder.setOrdinalPosition(c.ORDINAL_POSITION);
+ if (c.COLUMN_DEFAULT != null) {
+ columnBuilder.setDefaultValue(c.COLUMN_DEFAULT);
+ }
+
+ if ("YES".equalsIgnoreCase(c.IS_NULLABLE)) {
+ columnBuilder.setIsNullable(true);
+ } else {
+ columnBuilder.setIsNullable(false);
+ }
+ columnBuilder.setDataType(c.DATA_TYPE);
+ if (c.CHARACTER_MAXIMUM_LENGTH != null) {
+ columnBuilder.setCharMaxLength(c.CHARACTER_MAXIMUM_LENGTH);
+ }
+
+ if (c.CHARACTER_OCTET_LENGTH != null) {
+ columnBuilder.setCharOctetLength(c.CHARACTER_OCTET_LENGTH);
+ }
+
+ if (c.NUMERIC_PRECISION != null) {
+ columnBuilder.setNumericPrecision(c.NUMERIC_PRECISION);
+ }
+
+ if (c.NUMERIC_PRECISION_RADIX != null) {
+ columnBuilder.setNumericPrecisionRadix(c.NUMERIC_PRECISION_RADIX);
+ }
+
+ if (c.DATETIME_PRECISION != null) {
+ columnBuilder.setDateTimePrecision(c.DATETIME_PRECISION);
+ }
+
+ if (c.INTERVAL_TYPE != null) {
+ columnBuilder.setIntervalType(c.INTERVAL_TYPE);
+ }
+
+ if (c.INTERVAL_PRECISION != null) {
+ columnBuilder.setIntervalPrecision(c.INTERVAL_PRECISION);
+ }
+
+ respBuilder.addColumns(columnBuilder.build());
+ }
+
+ respBuilder.setStatus(RequestStatus.OK);
+ } catch (Exception e) {
+ respBuilder.setStatus(RequestStatus.FAILED);
+ respBuilder.setError(createPBError("get columns", e));
+ } finally {
+ return new Response(RpcType.COLUMNS, respBuilder.build());
+ }
+ }
+ }
+
+ /**
+ * Helper method to create a {@link InfoSchemaFilter} that combines the given filters with an AND.
+ * @param catalogNameFilter Optional filter on <code>catalog name</code>
+ * @param schemaNameFilter Optional filter on <code>schema name</code>
+ * @param tableNameFilter Optional filter on <code>table name</code>
+ * @param columnNameFilter Optional filter on <code>column name</code>
+ * @return
+ */
+ private static InfoSchemaFilter createInfoSchemaFilter(final LikeFilter catalogNameFilter,
+ final LikeFilter schemaNameFilter, final LikeFilter tableNameFilter, final LikeFilter columnNameFilter) {
+
+ FunctionExprNode exprNode = createLikeFunctionExprNode(CATS_COL_CATALOG_NAME, catalogNameFilter);
+
+ exprNode = combineFunctions(AND_FUNCTION,
+ exprNode,
+ combineFunctions(OR_FUNCTION,
+ createLikeFunctionExprNode(SHRD_COL_TABLE_SCHEMA, schemaNameFilter),
+ createLikeFunctionExprNode(SCHS_COL_SCHEMA_NAME, schemaNameFilter)
+ )
+ );
+
+ exprNode = combineFunctions(AND_FUNCTION,
+ exprNode,
+ createLikeFunctionExprNode(SHRD_COL_TABLE_NAME, tableNameFilter)
+ );
+
+ exprNode = combineFunctions(AND_FUNCTION,
+ exprNode,
+ createLikeFunctionExprNode(InfoSchemaConstants.COLS_COL_COLUMN_NAME, columnNameFilter)
+ );
+
+ return exprNode != null ? new InfoSchemaFilter(exprNode) : null;
+ }
+
+ /**
+ * Helper method to create {@link FunctionExprNode} from {@link LikeFilter}.
+ * @param fieldName Name of the filed on which the like expression is applied.
+ * @param likeFilter
+ * @return {@link FunctionExprNode} for given arguments. Null if the <code>likeFilter</code> is null.
+ */
+ private static FunctionExprNode createLikeFunctionExprNode(String fieldName, LikeFilter likeFilter) {
+ if (likeFilter == null) {
+ return null;
+ }
+
+ return new FunctionExprNode(LIKE_FUNCTION,
+ likeFilter.hasEscape() ?
+ ImmutableList.of(
+ new FieldExprNode(fieldName),
+ new ConstantExprNode(likeFilter.getRegex()),
+ new ConstantExprNode(likeFilter.getEscape())) :
+ ImmutableList.of(
+ new FieldExprNode(fieldName),
+ new ConstantExprNode(likeFilter.getRegex()))
+ );
+ }
+
+ /**
+ * Helper method to combine two {@link FunctionExprNode}s with a given <code>functionName</code>. If one of them is
+ * null, other one is returned as it is.
+ */
+ private static FunctionExprNode combineFunctions(final String functionName,
+ final FunctionExprNode func1, final FunctionExprNode func2) {
+ if (func1 == null) {
+ return func2;
+ }
+
+ if (func2 == null) {
+ return func1;
+ }
+
+ return new FunctionExprNode(functionName, ImmutableList.<ExprNode>of(func1, func2));
+ }
+
+ /**
+ * Helper method to create a {@link PojoRecordReader} for given arguments.
+ * @param tableType
+ * @param filter
+ * @param provider
+ * @param userSession
+ * @return
+ */
+ private static PojoRecordReader getPojoRecordReader(final InfoSchemaTableType tableType, final InfoSchemaFilter filter,
+ final SchemaTreeProvider provider, final UserSession userSession) {
+ final SchemaPlus rootSchema =
+ provider.createRootSchema(userSession.getCredentials().getUserName(), newSchemaConfigInfoProvider(userSession));
+ return tableType.getRecordReader(rootSchema, filter, userSession.getOptions());
+ }
+
+ /**
+ * Helper method to create a {@link SchemaConfigInfoProvider} instance for metadata purposes.
+ * @param session
+ * @return
+ */
+ private static SchemaConfigInfoProvider newSchemaConfigInfoProvider(final UserSession session) {
+ return new SchemaConfigInfoProvider() {
+ @Override
+ public ViewExpansionContext getViewExpansionContext() {
+ // Metadata APIs don't expect to expand the views.
+ throw new UnsupportedOperationException("View expansion context is not supported");
+ }
+
+ @Override
+ public OptionValue getOption(String optionKey) {
+ return session.getOptions().getOption(optionKey);
+ }
+ };
+ }
+
+ /**
+ * Helper method to create {@link DrillPBError} for client response message.
+ * @param failedFunction Brief description of the failed function.
+ * @param ex Exception thrown
+ * @return
+ */
+ private static DrillPBError createPBError(final String failedFunction, final Throwable ex) {
+ final String errorId = UUID.randomUUID().toString();
+ logger.error("Failed to {}. ErrorId: {}", failedFunction, errorId, ex);
+
+ final DrillPBError.Builder builder = DrillPBError.newBuilder();
+ builder.setErrorType(ErrorType.SYSTEM); // Metadata requests shouldn't cause any user errors
+ builder.setErrorId(errorId);
+ if (ex.getMessage() != null) {
+ builder.setMessage(ex.getMessage());
+ }
+
+ builder.setException(ErrorHelper.getWrapper(ex));
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 27126d3..cc614d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -21,16 +21,22 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.metadata.MetadataProvider;
public class UserWorker{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
@@ -44,7 +50,6 @@ public class UserWorker{
};
public UserWorker(WorkerBee bee) {
- super();
this.bee = bee;
}
@@ -52,7 +57,7 @@ public class UserWorker{
* Helper method to generate QueryId
* @return generated QueryId
*/
- private QueryId queryIdGenerator() {
+ private static QueryId queryIdGenerator() {
ThreadLocalRandom r = ThreadLocalRandom.current();
// create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random.
@@ -97,4 +102,20 @@ public class UserWorker{
final QueryPlanFragments qPlanFragments = new PlanSplitter().planFragments(bee.getContext(), queryId, req, connection);
return qPlanFragments;
}
+
+ public void submitCatalogMetadataWork(UserSession session, GetCatalogsReq req, ResponseSender sender) {
+ bee.addNewWork(MetadataProvider.catalogs(session, bee.getContext(), req, sender));
+ }
+
+ public void submitSchemasMetadataWork(UserSession session, GetSchemasReq req, ResponseSender sender) {
+ bee.addNewWork(MetadataProvider.schemas(session, bee.getContext(), req, sender));
+ }
+
+ public void submitTablesMetadataWork(UserSession session, GetTablesReq req, ResponseSender sender) {
+ bee.addNewWork(MetadataProvider.tables(session, bee.getContext(), req, sender));
+ }
+
+ public void submitColumnsMetadataWork(UserSession session, GetColumnsReq req, ResponseSender sender) {
+ bee.addNewWork(MetadataProvider.columns(session, bee.getContext(), req, sender));
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
new file mode 100644
index 0000000..bd2cdfb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.metadata;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
+import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.SchemaMetadata;
+import org.apache.drill.exec.proto.UserProtos.TableMetadata;
+
+import org.junit.Test;
+
+/**
+ * Tests for metadata provider APIs.
+ */
+public class TestMetadataProvider extends BaseTestQuery {
+
+ @Test
+ public void catalogs() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS"); // SQL equivalent
+
+ GetCatalogsResp resp = client.getCatalogs(null).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<CatalogMetadata> catalogs = resp.getCatalogsList();
+ assertEquals(1, catalogs.size());
+
+ CatalogMetadata c = catalogs.get(0);
+ assertEquals(IS_CATALOG_NAME, c.getCatalogName());
+ assertEquals(IS_CATALOG_DESCR, c.getDescription());
+ assertEquals(IS_CATALOG_CONNECT, c.getConnect());
+ }
+
+ @Test
+ public void catalogsWithFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS " +
+ // "WHERE CATALOG_NAME LIKE '%DRI%' ESCAPE '\\'"); // SQL equivalent
+ GetCatalogsResp resp =
+ client.getCatalogs(LikeFilter.newBuilder().setRegex("%DRI%").setEscape("\\").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<CatalogMetadata> catalogs = resp.getCatalogsList();
+ assertEquals(1, catalogs.size());
+
+ CatalogMetadata c = catalogs.get(0);
+ assertEquals(IS_CATALOG_NAME, c.getCatalogName());
+ assertEquals(IS_CATALOG_DESCR, c.getDescription());
+ assertEquals(IS_CATALOG_CONNECT, c.getConnect());
+ }
+
+ @Test
+ public void catalogsWithFilterNegative() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS
+ // WHERE CATALOG_NAME LIKE '%DRIj\\\\hgjh%' ESCAPE '\\'"); // SQL equivalent
+
+ GetCatalogsResp resp =
+ client.getCatalogs(LikeFilter.newBuilder().setRegex("%DRIj\\%hgjh%").setEscape("\\").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<CatalogMetadata> catalogs = resp.getCatalogsList();
+ assertEquals(0, catalogs.size());
+ }
+
+ @Test
+ public void schemas() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA"); // SQL equivalent
+
+ GetSchemasResp resp = client.getSchemas(null, null).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<SchemaMetadata> schemas = resp.getSchemasList();
+ assertEquals(9, schemas.size());
+
+ verifySchema("INFORMATION_SCHEMA", schemas);
+ verifySchema("cp.default", schemas);
+ verifySchema("dfs.default", schemas);
+ verifySchema("dfs.root", schemas);
+ verifySchema("dfs.tmp", schemas);
+ verifySchema("dfs_test.default", schemas);
+ verifySchema("dfs_test.home", schemas);
+ verifySchema("dfs_test.tmp", schemas);
+ verifySchema("sys", schemas);
+ }
+
+ @Test
+ public void schemasWithSchemaNameFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+
+ GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setRegex("%y%").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<SchemaMetadata> schemas = resp.getSchemasList();
+ assertEquals(1, schemas.size());
+
+ verifySchema("sys", schemas);
+ }
+
+ @Test
+ public void schemasWithCatalogNameFilterAndSchemaNameFilter() throws Exception {
+
+ // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA " +
+ // "WHERE CATALOG_NAME LIKE '%RI%' AND SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+
+ GetSchemasResp resp = client.getSchemas(
+ LikeFilter.newBuilder().setRegex("%RI%").build(),
+ LikeFilter.newBuilder().setRegex("%dfs_test%").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<SchemaMetadata> schemas = resp.getSchemasList();
+ assertEquals(3, schemas.size());
+
+ verifySchema("dfs_test.default", schemas);
+ verifySchema("dfs_test.home", schemas);
+ verifySchema("dfs_test.tmp", schemas);
+ }
+
+ @Test
+ public void tables() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES`"); // SQL equivalent
+
+ GetTablesResp resp = client.getTables(null, null, null).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<TableMetadata> tables = resp.getTablesList();
+ assertEquals(11, tables.size());
+
+ verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
+ verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
+ verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
+ verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
+ verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+ verifyTable("sys", "boot", tables);
+ verifyTable("sys", "drillbits", tables);
+ verifyTable("sys", "memory", tables);
+ verifyTable("sys", "options", tables);
+ verifyTable("sys", "threads", tables);
+ verifyTable("sys", "version", tables);
+ }
+
+ @Test
+ public void tablesWithTableNameFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE '%o%'"); // SQL equivalent
+
+ GetTablesResp resp = client.getTables(null, null,
+ LikeFilter.newBuilder().setRegex("%o%").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<TableMetadata> tables = resp.getTablesList();
+ assertEquals(4, tables.size());
+
+ verifyTable("sys", "boot", tables);
+ verifyTable("sys", "memory", tables);
+ verifyTable("sys", "options", tables);
+ verifyTable("sys", "version", tables);
+ }
+
+ @Test
+ public void tablesWithTableNameFilterAndSchemaNameFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` " +
+ // "WHERE TABLE_SCHEMA LIKE '%N\\_S%' ESCAPE '\\' AND TABLE_NAME LIKE '%o%'"); // SQL equivalent
+
+ GetTablesResp resp = client.getTables(null,
+ LikeFilter.newBuilder().setRegex("%N\\_S%").setEscape("\\").build(),
+ LikeFilter.newBuilder().setRegex("%o%").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<TableMetadata> tables = resp.getTablesList();
+ assertEquals(0, tables.size());
+ }
+
+ @Test
+ public void columns() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS"); // SQL equivalent
+
+ GetColumnsResp resp = client.getColumns(null, null, null, null).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<ColumnMetadata> columns = resp.getColumnsList();
+ assertEquals(70, columns.size());
+ // too many records to verify the output.
+ }
+
+ @Test
+ public void columnsWithColumnNameFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+ GetColumnsResp resp = client.getColumns(null, null, null,
+ LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<ColumnMetadata> columns = resp.getColumnsList();
+ assertEquals(5, columns.size());
+
+ verifyColumn("sys", "drillbits", "user_port", columns);
+ verifyColumn("sys", "drillbits", "control_port", columns);
+ verifyColumn("sys", "drillbits", "data_port", columns);
+ verifyColumn("sys", "memory", "user_port", columns);
+ verifyColumn("sys", "threads", "user_port", columns);
+ }
+
+ @Test
+ public void columnsWithColumnNameFilterAndTableNameFilter() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS
+ // WHERE TABLE_NAME LIKE '%bits' AND COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+ GetColumnsResp resp = client.getColumns(null, null,
+ LikeFilter.newBuilder().setRegex("%bits").build(),
+ LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<ColumnMetadata> columns = resp.getColumnsList();
+ assertEquals(3, columns.size());
+
+ verifyColumn("sys", "drillbits", "user_port", columns);
+ verifyColumn("sys", "drillbits", "control_port", columns);
+ verifyColumn("sys", "drillbits", "data_port", columns);
+ }
+
+ @Test
+ public void columnsWithAllSupportedFilters() throws Exception {
+ // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
+ // "TABLE_CATALOG LIKE '%ILL' AND TABLE_SCHEMA LIKE 'sys' AND " +
+ // "TABLE_NAME LIKE '%bits' AND COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+ GetColumnsResp resp = client.getColumns(
+ LikeFilter.newBuilder().setRegex("%ILL").build(),
+ LikeFilter.newBuilder().setRegex("sys").build(),
+ LikeFilter.newBuilder().setRegex("%bits").build(),
+ LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+ assertEquals(RequestStatus.OK, resp.getStatus());
+ List<ColumnMetadata> columns = resp.getColumnsList();
+ assertEquals(3, columns.size());
+
+ verifyColumn("sys", "drillbits", "user_port", columns);
+ verifyColumn("sys", "drillbits", "control_port", columns);
+ verifyColumn("sys", "drillbits", "data_port", columns);
+ }
+
+ /** Helper method to verify schema contents */
+ private static void verifySchema(String schemaName, List<SchemaMetadata> schemas) {
+ for(SchemaMetadata schema : schemas) {
+ if (schemaName.equals(schema.getSchemaName())) {
+ assertEquals(IS_CATALOG_NAME, schema.getCatalogName());
+ return;
+ }
+ }
+
+ fail("Failed to find schema '" + schemaName + "' in results: " + schemas);
+ }
+
+ /** Helper method to verify table contents */
+ private static void verifyTable(String schemaName, String tableName, List<TableMetadata> tables) {
+
+ for(TableMetadata table : tables) {
+ if (tableName.equals(table.getTableName()) && schemaName.equals(table.getSchemaName())) {
+ assertEquals(IS_CATALOG_NAME, table.getCatalogName());
+ return;
+ }
+ }
+
+ fail(String.format("Failed to find table '%s.%s' in results: %s", schemaName, tableName, tables));
+ }
+
+ /** Helper method to verify column contents */
+ private static void verifyColumn(String schemaName, String tableName, String columnName,
+ List<ColumnMetadata> columns) {
+
+ for(ColumnMetadata column : columns) {
+ if (schemaName.equals(column.getSchemaName()) && tableName.equals(column.getTableName()) &&
+ columnName.equals(column.getColumnName())) {
+ assertEquals(IS_CATALOG_NAME, column.getCatalogName());
+ return;
+ }
+ }
+
+ fail(String.format("Failed to find column '%s.%s.%s' in results: %s", schemaName, tableName, columnName, columns));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index fe6e5cd..49dbb3e 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -442,7 +442,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>20000000</maxsize>
+ <maxsize>21000000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>