You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/08/07 19:51:14 UTC

[5/5] drill git commit: DRILL-4728: Add support for new metadata fetch APIs

DRILL-4728: Add support for new metadata fetch APIs

+ Protobuf messages
   - GetCatalogsReq -> GetCatalogsResp
   - GetSchemasReq -> GetSchemasResp
   - GetTablesReq -> GetTablesResp
   - GetColumnsReq -> GetColumnsResp

+ Java Drill client changes

+ Server side changes to handle the metadata API calls
  - Provide a self contained `Runnable` implementation for each metadata API
    that process the requests and sends the response to client
  - In `UserWorker` override the `handle` method that takes the `ResponseSender` and
    send the response from the `handle` method instead of returning it.
  - Add a method for each new API to UserWorker to submit the metadata work.
  - Add a method `addNewWork(Runnable runnable)` to `WorkerBee` to submit a generic
    `Runnable` to `ExecutorService`.
  - Move out couple of methods from `QueryContext` into a separate interface
    `SchemaConfigInfoProvider` to enable instantiating Schema trees without the
    full `QueryContext`

+ New protobuf messages increased the `jdbc-all.jar` size. Up the limit to 21MB.

this closes #527


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ef6e522c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ef6e522c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ef6e522c

Branch: refs/heads/master
Commit: ef6e522c9cba816110aa43ff6bccedf29a901236
Parents: 4bd67a6
Author: vkorukanti <ve...@dremio.com>
Authored: Thu Jun 9 16:03:06 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Sun Aug 7 12:47:00 2016 -0700

----------------------------------------------------------------------
 .../drill/common/exceptions/ErrorHelper.java    |     4 +-
 .../apache/drill/exec/client/DrillClient.java   |   101 +
 .../org/apache/drill/exec/ops/QueryContext.java |    57 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |    16 +-
 .../drill/exec/rpc/user/UserRpcConfig.java      |    12 +
 .../apache/drill/exec/rpc/user/UserServer.java  |    54 +-
 .../apache/drill/exec/store/SchemaConfig.java   |    50 +-
 .../drill/exec/store/SchemaTreeProvider.java    |   106 +
 .../drill/exec/store/pojo/PojoRecordReader.java |    28 +-
 .../org/apache/drill/exec/work/WorkManager.java |     8 +
 .../exec/work/metadata/MetadataProvider.java    |   486 +
 .../apache/drill/exec/work/user/UserWorker.java |    25 +-
 .../work/metadata/TestMetadataProvider.java     |   308 +
 exec/jdbc-all/pom.xml                           |     2 +-
 .../drill/exec/proto/SchemaUserProtos.java      |  1752 +++
 .../org/apache/drill/exec/proto/UserProtos.java | 13537 ++++++++++++++++-
 .../drill/exec/proto/beans/CatalogMetadata.java |   207 +
 .../drill/exec/proto/beans/ColumnMetadata.java  |   493 +
 .../drill/exec/proto/beans/GetCatalogsReq.java  |   165 +
 .../drill/exec/proto/beans/GetCatalogsResp.java |   221 +
 .../drill/exec/proto/beans/GetColumnsReq.java   |   237 +
 .../drill/exec/proto/beans/GetColumnsResp.java  |   221 +
 .../drill/exec/proto/beans/GetSchemasReq.java   |   189 +
 .../drill/exec/proto/beans/GetSchemasResp.java  |   221 +
 .../drill/exec/proto/beans/GetTablesReq.java    |   213 +
 .../drill/exec/proto/beans/GetTablesResp.java   |   221 +
 .../drill/exec/proto/beans/LikeFilter.java      |   185 +
 .../drill/exec/proto/beans/RequestStatus.java   |    51 +
 .../apache/drill/exec/proto/beans/RpcType.java  |    16 +
 .../drill/exec/proto/beans/SchemaMetadata.java  |   251 +
 .../drill/exec/proto/beans/TableMetadata.java   |   229 +
 protocol/src/main/protobuf/User.proto           |   149 +-
 32 files changed, 19635 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
index 0aa5a1b..9b2097d 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
@@ -25,7 +25,7 @@ import java.util.regex.Pattern;
 /**
  * Utility class that handles error message generation from protobuf error objects.
  */
-class ErrorHelper {
+public class ErrorHelper {
 
   private final static Pattern IGNORE= Pattern.compile("^(sun|com\\.sun|java).*");
 
@@ -96,7 +96,7 @@ class ErrorHelper {
     return sb.toString();
   }
 
-  static ExceptionWrapper getWrapper(Throwable ex) {
+  public static ExceptionWrapper getWrapper(Throwable ex) {
     return getWrapperBuilder(ex).build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 11abbcc..8063778 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -53,7 +53,16 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -411,6 +420,98 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   /**
+   * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+   *
+   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+   * @return
+   */
+  public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) {
+    final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder();
+    if (catalogNameFilter != null) {
+      reqBuilder.setCatalogNameFilter(catalogNameFilter);
+    }
+
+    return client.send(RpcType.GET_CATALOGS, reqBuilder.build(), GetCatalogsResp.class);
+  }
+
+  /**
+   * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+   *
+   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+   * @return
+   */
+  public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) {
+    final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder();
+    if (catalogNameFilter != null) {
+      reqBuilder.setCatalogNameFilter(catalogNameFilter);
+    }
+
+    if (schemaNameFilter != null) {
+      reqBuilder.setSchameNameFilter(schemaNameFilter);
+    }
+
+    return client.send(RpcType.GET_SCHEMAS, reqBuilder.build(), GetSchemasResp.class);
+  }
+
+  /**
+   * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+   *
+   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
+   * @return
+   */
+  public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
+      LikeFilter tableNameFilter) {
+    final GetTablesReq.Builder reqBuilder = GetTablesReq.newBuilder();
+    if (catalogNameFilter != null) {
+      reqBuilder.setCatalogNameFilter(catalogNameFilter);
+    }
+
+    if (schemaNameFilter != null) {
+      reqBuilder.setSchameNameFilter(schemaNameFilter);
+    }
+
+    if (tableNameFilter != null) {
+      reqBuilder.setTableNameFilter(tableNameFilter);
+    }
+
+    return client.send(RpcType.GET_TABLES, reqBuilder.build(), GetTablesResp.class);
+  }
+
+  /**
+   * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+   *
+   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
+   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
+   * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter.
+   * @return
+   */
+  public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
+      LikeFilter tableNameFilter, LikeFilter columnNameFilter) {
+    final GetColumnsReq.Builder reqBuilder = GetColumnsReq.newBuilder();
+    if (catalogNameFilter != null) {
+      reqBuilder.setCatalogNameFilter(catalogNameFilter);
+    }
+
+    if (schemaNameFilter != null) {
+      reqBuilder.setSchameNameFilter(schemaNameFilter);
+    }
+
+    if (tableNameFilter != null) {
+      reqBuilder.setTableNameFilter(tableNameFilter);
+    }
+
+    if (columnNameFilter != null) {
+      reqBuilder.setColumnNameFilter(columnNameFilter);
+    }
+
+    return client.send(RpcType.GET_COLUMNS, reqBuilder.build(), GetColumnsResp.class);
+  }
+
+  /**
    * Submits a Logical plan for direct execution (bypasses parsing)
    *
    * @param  plan  the plan to execute

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 3ce0633..44e33cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,16 +19,13 @@ package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.calcite.jdbc.SimpleCalciteSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -41,21 +38,22 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.QueryOptionManager;
-import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.store.PartitionExplorerImpl;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.store.SchemaTreeProvider;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.util.Utilities;
 
 import com.google.common.collect.Lists;
 
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
-public class QueryContext implements AutoCloseable, OptimizerRulesContext {
+public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private final DrillbitContext drillbitContext;
@@ -70,8 +68,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
   private final ContextInformation contextInformation;
   private final QueryContextInformation queryContextInfo;
   private final ViewExpansionContext viewExpansionContext;
-
-  private final List<SchemaPlus> schemaTreesToClose;
+  private final SchemaTreeProvider schemaTreeProvider;
 
   /*
    * Flag to indicate if close has been called, after calling close the first
@@ -97,7 +94,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
         plannerSettings.getPlanningMemoryLimit());
     bufferManager = new BufferManagerImpl(this.allocator);
     viewExpansionContext = new ViewExpansionContext(this);
-    schemaTreesToClose = Lists.newArrayList();
+    schemaTreeProvider = new SchemaTreeProvider(drillbitContext);
   }
 
   @Override
@@ -146,9 +143,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
    * @return Root of the schema tree.
    */
   public SchemaPlus getRootSchema(final String userName) {
-    final String schemaUser = isImpersonationEnabled() ? userName : ImpersonationUtil.getProcessUserName();
-    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, this).build();
-    return getRootSchema(schemaConfig);
+    return schemaTreeProvider.createRootSchema(userName, this);
   }
 
   /**
@@ -157,17 +152,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
    * @return
    */
   public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
-    try {
-      final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false);
-      drillbitContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
-      schemaTreesToClose.add(rootSchema);
-      return rootSchema;
-    } catch(IOException e) {
-      // We can't proceed further without a schema, throw a runtime exception.
-      final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
-      logger.error(errMsg, e);
-      throw new DrillRuntimeException(errMsg, e);
-    }
+    return schemaTreeProvider.createRootSchema(schemaConfig);
   }
 
   /**
@@ -211,10 +196,16 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
     return drillbitContext.getFunctionImplementationRegistry();
   }
 
+  @Override
   public ViewExpansionContext getViewExpansionContext() {
     return viewExpansionContext;
   }
 
+  @Override
+  public OptionValue getOption(String optionKey) {
+    return getOptions().getOption(optionKey);
+  }
+
   public boolean isImpersonationEnabled() {
      return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
   }
@@ -256,28 +247,12 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
         // allocator from the toClose list.
         toClose.add(bufferManager);
         toClose.add(allocator);
+        toClose.add(schemaTreeProvider);
 
-        for(SchemaPlus tree : schemaTreesToClose) {
-          addSchemasToCloseList(tree, toClose);
-        }
-
-        AutoCloseables.close(toClose.toArray(new AutoCloseable[0]));
+        AutoCloseables.close(toClose);
       }
     } finally {
       closed = true;
     }
   }
-
-  private void addSchemasToCloseList(final SchemaPlus tree, final List<AutoCloseable> toClose) {
-    for(String subSchemaName : tree.getSubSchemaNames()) {
-      addSchemasToCloseList(tree.getSubSchema(subSchemaName), toClose);
-    }
-
-    try {
-      AbstractSchema drillSchemaImpl =  tree.unwrap(AbstractSchema.class);
-      toClose.add(drillSchemaImpl);
-    } catch (ClassCastException e) {
-      // Ignore as the SchemaPlus is not an implementation of Drill schema.
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 86abaca..5106787 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -31,7 +31,11 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -99,12 +103,20 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
       return BitToUserHandshake.getDefaultInstance();
     case RpcType.QUERY_HANDLE_VALUE:
       return QueryId.getDefaultInstance();
-      case RpcType.QUERY_RESULT_VALUE:
-        return QueryResult.getDefaultInstance();
+    case RpcType.QUERY_RESULT_VALUE:
+      return QueryResult.getDefaultInstance();
     case RpcType.QUERY_DATA_VALUE:
       return QueryData.getDefaultInstance();
     case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
       return QueryPlanFragments.getDefaultInstance();
+    case RpcType.CATALOGS_VALUE:
+      return GetCatalogsResp.getDefaultInstance();
+    case RpcType.SCHEMAS_VALUE:
+      return GetSchemasResp.getDefaultInstance();
+    case RpcType.TABLES_VALUE:
+      return GetTablesResp.getDefaultInstance();
+    case RpcType.COLUMNS_VALUE:
+      return GetColumnsResp.getDefaultInstance();
     }
     throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index f0cbb22..809ac65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -26,7 +26,15 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -49,6 +57,10 @@ public class UserRpcConfig {
         .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
         .add(RpcType.GET_QUERY_PLAN_FRAGMENTS, GetQueryPlanFragments.class,
           RpcType.QUERY_PLAN_FRAGMENTS, QueryPlanFragments.class) // user to bit
+        .add(RpcType.GET_CATALOGS, GetCatalogsReq.class, RpcType.CATALOGS, GetCatalogsResp.class) // user to bit
+        .add(RpcType.GET_SCHEMAS, GetSchemasReq.class, RpcType.SCHEMAS, GetSchemasResp.class) // user to bit
+        .add(RpcType.GET_TABLES, GetTablesReq.class, RpcType.TABLES, GetTablesResp.class) // user to bit
+        .add(RpcType.GET_COLUMNS, GetColumnsReq.class, RpcType.COLUMNS, GetColumnsResp.class) // user to bit
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 09bc5c8..adf7ec4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -38,7 +38,11 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,6 +55,7 @@ import org.apache.drill.exec.rpc.OutboundRpcMessage;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
@@ -101,8 +106,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
 
   @Override
-  protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
-      throws RpcException {
+  protected void handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+      ResponseSender responseSender) throws RpcException {
     switch (rpcType) {
 
     case RpcType.RUN_QUERY_VALUE:
@@ -110,7 +115,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       try {
         final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
         final QueryId queryId = worker.submitWork(connection, query);
-        return new Response(RpcType.QUERY_HANDLE, queryId);
+        responseSender.send(new Response(RpcType.QUERY_HANDLE, queryId));
+        break;
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding RunQuery body.", e);
       }
@@ -119,7 +125,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       try {
         final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
         final Ack ack = worker.cancelQuery(queryId);
-        return new Response(RpcType.ACK, ack);
+        responseSender.send(new Response(RpcType.ACK, ack));
+        break;
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding QueryId body.", e);
       }
@@ -128,21 +135,54 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       try {
         final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
         final Ack ack = worker.resumeQuery(queryId);
-        return new Response(RpcType.ACK, ack);
+        responseSender.send(new Response(RpcType.ACK, ack));
+        break;
       } catch (final InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding QueryId body.", e);
       }
     case RpcType.GET_QUERY_PLAN_FRAGMENTS_VALUE:
       try {
         final GetQueryPlanFragments req = GetQueryPlanFragments.PARSER.parseFrom(new ByteBufInputStream(pBody));
-        return new Response(RpcType.QUERY_PLAN_FRAGMENTS, worker.getQueryPlan(connection, req));
+        responseSender.send(new Response(RpcType.QUERY_PLAN_FRAGMENTS, worker.getQueryPlan(connection, req)));
+        break;
       } catch(final InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding GetQueryPlanFragments body.", e);
       }
+    case RpcType.GET_CATALOGS_VALUE:
+      try {
+        final GetCatalogsReq req = GetCatalogsReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        worker.submitCatalogMetadataWork(connection.getSession(), req, responseSender);
+        break;
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding GetCatalogsReq body.", e);
+      }
+    case RpcType.GET_SCHEMAS_VALUE:
+      try {
+        final GetSchemasReq req = GetSchemasReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        worker.submitSchemasMetadataWork(connection.getSession(), req, responseSender);
+        break;
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding GetSchemasReq body.", e);
+      }
+    case RpcType.GET_TABLES_VALUE:
+      try {
+        final GetTablesReq req = GetTablesReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        worker.submitTablesMetadataWork(connection.getSession(), req, responseSender);
+        break;
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding GetTablesReq body.", e);
+      }
+    case RpcType.GET_COLUMNS_VALUE:
+      try {
+        final GetColumnsReq req = GetColumnsReq.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        worker.submitColumnsMetadataWork(connection.getSession(), req, responseSender);
+        break;
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding GetColumnsReq body.", e);
+      }
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
     }
-
   }
 
   public class UserClientConnection extends RemoteConnection {

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
index 0297945..3e8f1c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -19,7 +19,8 @@ package org.apache.drill.exec.store;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.drill.exec.ops.QueryContext;
+
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.server.options.OptionValue;
 
@@ -28,29 +29,35 @@ import org.apache.drill.exec.server.options.OptionValue;
  */
 public class SchemaConfig {
   private final String userName;
-  private final QueryContext queryContext;
+  private final SchemaConfigInfoProvider provider;
   private final boolean ignoreAuthErrors;
 
-  private SchemaConfig(final String userName, final QueryContext queryContext, final boolean ignoreAuthErrors) {
+  private SchemaConfig(final String userName, final SchemaConfigInfoProvider provider, final boolean ignoreAuthErrors) {
     this.userName = userName;
-    this.queryContext = queryContext;
+    this.provider = provider;
     this.ignoreAuthErrors = ignoreAuthErrors;
   }
 
-  public static Builder newBuilder(final String userName, final QueryContext queryContext) {
+  /**
+   * Create new builder.
+   * @param userName Name of the user accessing the storage sources.
+   * @param provider Implementation {@link SchemaConfigInfoProvider}
+   * @return
+   */
+  public static Builder newBuilder(final String userName, final SchemaConfigInfoProvider provider) {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(userName), "A valid userName is expected");
-    Preconditions.checkNotNull(queryContext, "Non-null QueryContext is expected");
-    return new Builder(userName, queryContext);
+    Preconditions.checkNotNull(provider, "Non-null SchemaConfigInfoProvider is expected");
+    return new Builder(userName, provider);
   }
 
   public static class Builder {
     final String userName;
-    final QueryContext queryContext;
+    final SchemaConfigInfoProvider provider;
     boolean ignoreAuthErrors;
 
-    private Builder(final String userName, final QueryContext queryContext) {
+    private Builder(final String userName, final SchemaConfigInfoProvider provider) {
       this.userName = userName;
-      this.queryContext = queryContext;
+      this.provider = provider;
     }
 
     public Builder setIgnoreAuthErrors(boolean ignoreAuthErrors) {
@@ -59,16 +66,12 @@ public class SchemaConfig {
     }
 
     public SchemaConfig build() {
-      return new SchemaConfig(userName, queryContext, ignoreAuthErrors);
+      return new SchemaConfig(userName, provider, ignoreAuthErrors);
     }
   }
 
-  public QueryContext getQueryContext() {
-    return queryContext;
-  }
-
   /**
-   * @return User whom to impersonate as while {@link net.hydromatic.optiq.SchemaPlus} instances
+   * @return User whom to impersonate as while creating {@link SchemaPlus} instances
    * interact with the underlying storage.
    */
   public String getUserName() {
@@ -76,7 +79,7 @@ public class SchemaConfig {
   }
 
   /**
-   * @return Should ignore if authorization errors are reported while {@link net.hydromatic.optiq.SchemaPlus}
+   * @return Should ignore if authorization errors are reported while {@link SchemaPlus}
    * instances interact with the underlying storage.
    */
   public boolean getIgnoreAuthErrors() {
@@ -84,10 +87,19 @@ public class SchemaConfig {
   }
 
   public OptionValue getOption(String optionKey) {
-    return queryContext.getOptions().getOption(optionKey);
+    return provider.getOption(optionKey);
   }
 
   public ViewExpansionContext getViewExpansionContext() {
-    return queryContext.getViewExpansionContext();
+    return provider.getViewExpansionContext();
+  }
+
+  /**
+   * Interface to implement to provide required info for {@link org.apache.drill.exec.store.SchemaConfig}
+   */
+  public interface SchemaConfigInfoProvider {
+    ViewExpansionContext getViewExpansionContext();
+
+    OptionValue getOption(String optionKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
new file mode 100644
index 0000000..d05cc43
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.calcite.jdbc.SimpleCalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class which creates new schema trees. It keeps track of newly created schema trees and closes them safely as
+ * part of {@link #close()}.
+ */
+public class SchemaTreeProvider implements AutoCloseable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaTreeProvider.class);
+
+  private final DrillbitContext dContext;
+  private final List<SchemaPlus> schemaTreesToClose;
+  private final boolean isImpersonationEnabled;
+
+  public SchemaTreeProvider(final DrillbitContext dContext) {
+    this.dContext = dContext;
+    schemaTreesToClose = Lists.newArrayList();
+    isImpersonationEnabled = dContext.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
+  /**
+   * Return root schema with schema owner as the given user.
+   *
+   * @param userName Name of the user who is accessing the storage sources.
+   * @param provider {@link SchemaConfigInfoProvider} instance
+   * @return Root of the schema tree.
+   */
+  public SchemaPlus createRootSchema(final String userName, final SchemaConfigInfoProvider provider) {
+    final String schemaUser = isImpersonationEnabled ? userName : ImpersonationUtil.getProcessUserName();
+    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, provider).build();
+    return createRootSchema(schemaConfig);
+  }
+
+  /**
+   * Create and return a SchemaTree with given <i>schemaConfig</i>.
+   * @param schemaConfig
+   * @return
+   */
+  public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
+    try {
+      final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false);
+      dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
+      schemaTreesToClose.add(rootSchema);
+      return rootSchema;
+    } catch(IOException e) {
+      // We can't proceed further without a schema, throw a runtime exception.
+      throw UserException
+          .resourceError(e)
+          .message("Failed to create schema tree.")
+          .build(logger);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    List<AutoCloseable> toClose = Lists.newArrayList();
+    for(SchemaPlus tree : schemaTreesToClose) {
+      addSchemasToCloseList(tree, toClose);
+    }
+
+    AutoCloseables.close(toClose);
+  }
+
+  private static void addSchemasToCloseList(final SchemaPlus tree, final List<AutoCloseable> toClose) {
+    for(String subSchemaName : tree.getSubSchemaNames()) {
+      addSchemasToCloseList(tree.getSubSchema(subSchemaName), toClose);
+    }
+
+    try {
+      AbstractSchema drillSchemaImpl =  tree.unwrap(AbstractSchema.class);
+      toClose.add(drillSchemaImpl);
+    } catch (ClassCastException e) {
+      // Ignore as the SchemaPlus is not an implementation of Drill schema.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 7feb303..baf07a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.pojo.Writers.BitWriter;
 import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
@@ -47,24 +46,30 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-public class PojoRecordReader<T> extends AbstractRecordReader {
+public class PojoRecordReader<T> extends AbstractRecordReader implements Iterable<T> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PojoRecordReader.class);
 
-  public final int forJsonIgnore = 1;
-
   private final Class<T> pojoClass;
-  private final Iterator<T> iterator;
+  private final List<T> pojoObjects;
   private PojoWriter[] writers;
   private boolean doCurrent;
   private T currentPojo;
   private OperatorContext operatorContext;
 
+  private Iterator<T> currentIterator;
+
+  /**
+   * TODO: Cleanup the callers to pass the List of POJO objects directly rather than iterator.
+   * @param pojoClass
+   * @param iterator
+   */
   public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator) {
     this.pojoClass = pojoClass;
-    this.iterator = iterator;
+    this.pojoObjects = ImmutableList.copyOf(iterator);
   }
 
   @Override
@@ -118,7 +123,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
       throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e);
     }
 
-
+    currentIterator = pojoObjects.iterator();
   }
 
   @Override
@@ -146,11 +151,11 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
     injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
     try {
       int i =0;
-      while (doCurrent || iterator.hasNext()) {
+      while (doCurrent || currentIterator.hasNext()) {
         if (doCurrent) {
           doCurrent = false;
         } else {
-          currentPojo = iterator.next();
+          currentPojo = currentIterator.next();
         }
 
         if (!allocated) {
@@ -174,6 +179,11 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   }
 
   @Override
+  public Iterator<T> iterator() {
+    return pojoObjects.iterator();
+  }
+
+  @Override
   public void close() {
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e910150..ee11592 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -203,6 +203,14 @@ public class WorkManager implements AutoCloseable {
     }
 
     /**
+     * Add a self contained runnable work to executor service.
+     * @param runnable
+     */
+    public void addNewWork(final Runnable runnable) {
+      executor.execute(runnable);
+    }
+
+    /**
      * Remove the given Foreman from the running query list.
      *
      * <p>The running query list is a bit of a misnomer, because it doesn't

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
new file mode 100644
index 0000000..aca54b3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.metadata;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.CATALOGS;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.COLUMNS;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.SCHEMATA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaTableType.TABLES;
+
+import java.util.UUID;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.ErrorHelper;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
+import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.SchemaMetadata;
+import org.apache.drill.exec.proto.UserProtos.TableMetadata;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
+import org.apache.drill.exec.store.SchemaTreeProvider;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ConstantExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FieldExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FunctionExprNode;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
+import org.apache.drill.exec.store.ischema.Records.Catalog;
+import org.apache.drill.exec.store.ischema.Records.Column;
+import org.apache.drill.exec.store.ischema.Records.Schema;
+import org.apache.drill.exec.store.ischema.Records.Table;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains worker {@link Runnable} classes for providing the metadata and related helper methods.
+ */
+public class MetadataProvider {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetadataProvider.class);
+
+  private static final String LIKE_FUNCTION = "like";
+  private static final String AND_FUNCTION = "booleanand";
+  private static final String OR_FUNCTION = "booleanor";
+
+  /**
+   * @return Runnable that fetches the catalog metadata for given {@link GetCatalogsReq} and sends response at the end.
+   */
+  public static Runnable catalogs(final UserSession session, final DrillbitContext dContext,
+      final GetCatalogsReq req, final ResponseSender responseSender) {
+    return new CatalogsProvider(session, dContext, req, responseSender);
+  }
+
+  /**
+   * @return Runnable that fetches the schema metadata for given {@link GetSchemasReq} and sends response at the end.
+   */
+  public static Runnable schemas(final UserSession session, final DrillbitContext dContext,
+      final GetSchemasReq req, final ResponseSender responseSender) {
+    return new SchemasProvider(session, dContext, req, responseSender);
+  }
+
+  /**
+   * @return Runnable that fetches the table metadata for given {@link GetTablesReq} and sends response at the end.
+   */
+  public static Runnable tables(final UserSession session, final DrillbitContext dContext,
+      final GetTablesReq req, final ResponseSender responseSender) {
+    return new TablesProvider(session, dContext, req, responseSender);
+  }
+
+  /**
+   * @return Runnable that fetches the column metadata for given {@link GetColumnsReq} and sends response at the end.
+   */
+  public static Runnable columns(final UserSession session, final DrillbitContext dContext,
+      final GetColumnsReq req, final ResponseSender responseSender) {
+    return new ColumnsProvider(session, dContext, req, responseSender);
+  }
+
+  /**
+   * Super class for all metadata provider runnable classes.
+   */
+  private abstract static class MetadataRunnable implements Runnable {
+    protected final UserSession session;
+    private final ResponseSender responseSender;
+    private final DrillbitContext dContext;
+
+    private MetadataRunnable(final UserSession session, final DrillbitContext dContext,
+        final ResponseSender responseSender) {
+      this.session = Preconditions.checkNotNull(session);
+      this.dContext = Preconditions.checkNotNull(dContext);
+      this.responseSender = Preconditions.checkNotNull(responseSender);
+    }
+
+    @Override
+    public void run() {
+      try(SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(dContext)) {
+        responseSender.send(runInternal(session, schemaTreeProvider));
+      } catch (final Throwable error) {
+        logger.error("Unhandled metadata provider error", error);
+      }
+    }
+
+    /**
+     * @return A {@link Response} message. Response must be returned in any case.
+     */
+    protected abstract Response runInternal(UserSession session, SchemaTreeProvider schemaProvider);
+  }
+
+  /**
+   * Runnable that fetches the catalog metadata for given {@link GetCatalogsReq} and sends response at the end.
+   */
+  private static class CatalogsProvider extends MetadataRunnable {
+    private final GetCatalogsReq req;
+
+    public CatalogsProvider(final UserSession session, final DrillbitContext dContext,
+        final GetCatalogsReq req, final ResponseSender responseSender) {
+      super(session, dContext, responseSender);
+      this.req = Preconditions.checkNotNull(req);
+    }
+
+    @Override
+    protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+      final GetCatalogsResp.Builder respBuilder = GetCatalogsResp.newBuilder();
+
+      final InfoSchemaFilter filter = createInfoSchemaFilter(
+          req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null, null, null, null);
+
+      try {
+        final PojoRecordReader<Catalog> records =
+            (PojoRecordReader<Catalog>) getPojoRecordReader(CATALOGS, filter, schemaProvider, session);
+
+        for(Catalog c : records) {
+          final CatalogMetadata.Builder catBuilder = CatalogMetadata.newBuilder();
+          catBuilder.setCatalogName(c.CATALOG_NAME);
+          catBuilder.setDescription(c.CATALOG_DESCRIPTION);
+          catBuilder.setConnect(c.CATALOG_CONNECT);
+
+          respBuilder.addCatalogs(catBuilder.build());
+        }
+
+        respBuilder.setStatus(RequestStatus.OK);
+      } catch (Throwable e) {
+        respBuilder.setStatus(RequestStatus.FAILED);
+        respBuilder.setError(createPBError("get catalogs", e));
+      } finally {
+        return new Response(RpcType.CATALOGS, respBuilder.build());
+      }
+    }
+  }
+
+  private static class SchemasProvider extends MetadataRunnable {
+    private final GetSchemasReq req;
+
+    private SchemasProvider(final UserSession session, final DrillbitContext dContext,
+        final GetSchemasReq req, final ResponseSender responseSender) {
+      super(session, dContext, responseSender);
+      this.req = Preconditions.checkNotNull(req);
+    }
+
+    @Override
+    protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+      final GetSchemasResp.Builder respBuilder = GetSchemasResp.newBuilder();
+
+      final InfoSchemaFilter filter = createInfoSchemaFilter(
+          req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+          req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+          null, null);
+
+      try {
+        final PojoRecordReader<Schema> records = (PojoRecordReader<Schema>)
+            getPojoRecordReader(SCHEMATA, filter, schemaProvider, session);
+
+        for(Schema s : records) {
+          final SchemaMetadata.Builder schemaBuilder = SchemaMetadata.newBuilder();
+          schemaBuilder.setCatalogName(s.CATALOG_NAME);
+          schemaBuilder.setSchemaName(s.SCHEMA_NAME);
+          schemaBuilder.setOwner(s.SCHEMA_OWNER);
+          schemaBuilder.setType(s.TYPE);
+          schemaBuilder.setMutable(s.IS_MUTABLE);
+
+          respBuilder.addSchemas(schemaBuilder.build());
+        }
+
+        respBuilder.setStatus(RequestStatus.OK);
+      } catch (Throwable e) {
+        respBuilder.setStatus(RequestStatus.FAILED);
+        respBuilder.setError(createPBError("get schemas", e));
+      } finally {
+        return new Response(RpcType.SCHEMAS, respBuilder.build());
+      }
+    }
+  }
+
+  private static class TablesProvider extends MetadataRunnable {
+    private final GetTablesReq req;
+
+    private TablesProvider(final UserSession session, final DrillbitContext dContext,
+        final GetTablesReq req, final ResponseSender responseSender) {
+      super(session, dContext, responseSender);
+      this.req = Preconditions.checkNotNull(req);
+    }
+
+    @Override
+    protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+      final GetTablesResp.Builder respBuilder = GetTablesResp.newBuilder();
+
+      final InfoSchemaFilter filter = createInfoSchemaFilter(
+          req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+          req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+          req.hasTableNameFilter() ? req.getTableNameFilter() : null,
+          null);
+
+      try {
+        final PojoRecordReader<Table> records =
+            (PojoRecordReader<Table>)getPojoRecordReader(TABLES, filter, schemaProvider, session);
+
+        for(Table t : records) {
+          final TableMetadata.Builder tableBuilder = TableMetadata.newBuilder();
+          tableBuilder.setCatalogName(t.TABLE_CATALOG);
+          tableBuilder.setSchemaName(t.TABLE_SCHEMA);
+          tableBuilder.setTableName(t.TABLE_NAME);
+          tableBuilder.setType(t.TABLE_TYPE);
+
+          respBuilder.addTables(tableBuilder.build());
+        }
+
+        respBuilder.setStatus(RequestStatus.OK);
+      } catch (Throwable e) {
+        respBuilder.setStatus(RequestStatus.FAILED);
+        respBuilder.setError(createPBError("get tables", e));
+      } finally {
+        return new Response(RpcType.TABLES, respBuilder.build());
+      }
+    }
+  }
+
+  private static class ColumnsProvider extends MetadataRunnable {
+    private final GetColumnsReq req;
+
+    private ColumnsProvider(final UserSession session, final DrillbitContext dContext,
+        final GetColumnsReq req, final ResponseSender responseSender) {
+      super(session, dContext, responseSender);
+      this.req = Preconditions.checkNotNull(req);
+    }
+
+    @Override
+    protected Response runInternal(final UserSession session, final SchemaTreeProvider schemaProvider) {
+      final GetColumnsResp.Builder respBuilder = GetColumnsResp.newBuilder();
+
+      final InfoSchemaFilter filter = createInfoSchemaFilter(
+          req.hasCatalogNameFilter() ? req.getCatalogNameFilter() : null,
+          req.hasSchameNameFilter() ? req.getSchameNameFilter() : null,
+          req.hasTableNameFilter() ? req.getTableNameFilter() : null,
+          req.hasColumnNameFilter() ? req.getColumnNameFilter() : null
+      );
+
+      try {
+        final PojoRecordReader<Column> records =
+            (PojoRecordReader<Column>)getPojoRecordReader(COLUMNS, filter, schemaProvider, session);
+
+        for(Column c : records) {
+          final ColumnMetadata.Builder columnBuilder = ColumnMetadata.newBuilder();
+          columnBuilder.setCatalogName(c.TABLE_CATALOG);
+          columnBuilder.setSchemaName(c.TABLE_SCHEMA);
+          columnBuilder.setTableName(c.TABLE_NAME);
+          columnBuilder.setColumnName(c.COLUMN_NAME);
+          columnBuilder.setOrdinalPosition(c.ORDINAL_POSITION);
+          if (c.COLUMN_DEFAULT != null) {
+            columnBuilder.setDefaultValue(c.COLUMN_DEFAULT);
+          }
+
+          if ("YES".equalsIgnoreCase(c.IS_NULLABLE)) {
+            columnBuilder.setIsNullable(true);
+          } else {
+            columnBuilder.setIsNullable(false);
+          }
+          columnBuilder.setDataType(c.DATA_TYPE);
+          if (c.CHARACTER_MAXIMUM_LENGTH != null) {
+            columnBuilder.setCharMaxLength(c.CHARACTER_MAXIMUM_LENGTH);
+          }
+
+          if (c.CHARACTER_OCTET_LENGTH != null) {
+            columnBuilder.setCharOctetLength(c.CHARACTER_OCTET_LENGTH);
+          }
+
+          if (c.NUMERIC_PRECISION != null) {
+            columnBuilder.setNumericPrecision(c.NUMERIC_PRECISION);
+          }
+
+          if (c.NUMERIC_PRECISION_RADIX != null) {
+            columnBuilder.setNumericPrecisionRadix(c.NUMERIC_PRECISION_RADIX);
+          }
+
+          if (c.DATETIME_PRECISION != null) {
+            columnBuilder.setDateTimePrecision(c.DATETIME_PRECISION);
+          }
+
+          if (c.INTERVAL_TYPE != null) {
+            columnBuilder.setIntervalType(c.INTERVAL_TYPE);
+          }
+
+          if (c.INTERVAL_PRECISION != null) {
+            columnBuilder.setIntervalPrecision(c.INTERVAL_PRECISION);
+          }
+
+          respBuilder.addColumns(columnBuilder.build());
+        }
+
+        respBuilder.setStatus(RequestStatus.OK);
+      } catch (Exception e) {
+        respBuilder.setStatus(RequestStatus.FAILED);
+        respBuilder.setError(createPBError("get columns", e));
+      } finally {
+        return new Response(RpcType.COLUMNS, respBuilder.build());
+      }
+    }
+  }
+
+  /**
+   * Helper method to create a {@link InfoSchemaFilter} that combines the given filters with an AND.
+   * @param catalogNameFilter Optional filter on <code>catalog name</code>
+   * @param schemaNameFilter Optional filter on <code>schema name</code>
+   * @param tableNameFilter Optional filter on <code>table name</code>
+   * @param columnNameFilter Optional filter on <code>column name</code>
+   * @return
+   */
+  private static InfoSchemaFilter createInfoSchemaFilter(final LikeFilter catalogNameFilter,
+      final LikeFilter schemaNameFilter, final LikeFilter tableNameFilter, final LikeFilter columnNameFilter) {
+
+    FunctionExprNode exprNode = createLikeFunctionExprNode(CATS_COL_CATALOG_NAME,  catalogNameFilter);
+
+    exprNode = combineFunctions(AND_FUNCTION,
+        exprNode,
+        combineFunctions(OR_FUNCTION,
+            createLikeFunctionExprNode(SHRD_COL_TABLE_SCHEMA, schemaNameFilter),
+            createLikeFunctionExprNode(SCHS_COL_SCHEMA_NAME, schemaNameFilter)
+        )
+    );
+
+    exprNode = combineFunctions(AND_FUNCTION,
+        exprNode,
+        createLikeFunctionExprNode(SHRD_COL_TABLE_NAME, tableNameFilter)
+    );
+
+    exprNode = combineFunctions(AND_FUNCTION,
+        exprNode,
+        createLikeFunctionExprNode(InfoSchemaConstants.COLS_COL_COLUMN_NAME, columnNameFilter)
+    );
+
+    return exprNode != null ? new InfoSchemaFilter(exprNode) : null;
+  }
+
+  /**
+   * Helper method to create {@link FunctionExprNode} from {@link LikeFilter}.
+   * @param fieldName Name of the filed on which the like expression is applied.
+   * @param likeFilter
+   * @return {@link FunctionExprNode} for given arguments. Null if the <code>likeFilter</code> is null.
+   */
+  private static FunctionExprNode createLikeFunctionExprNode(String fieldName, LikeFilter likeFilter) {
+    if (likeFilter == null) {
+      return null;
+    }
+
+    return new FunctionExprNode(LIKE_FUNCTION,
+        likeFilter.hasEscape() ?
+            ImmutableList.of(
+                new FieldExprNode(fieldName),
+                new ConstantExprNode(likeFilter.getRegex()),
+                new ConstantExprNode(likeFilter.getEscape())) :
+            ImmutableList.of(
+                new FieldExprNode(fieldName),
+                new ConstantExprNode(likeFilter.getRegex()))
+    );
+  }
+
+  /**
+   * Helper method to combine two {@link FunctionExprNode}s with a given <code>functionName</code>. If one of them is
+   * null, other one is returned as it is.
+   */
+  private static FunctionExprNode combineFunctions(final String functionName,
+      final FunctionExprNode func1, final FunctionExprNode func2) {
+    if (func1 == null) {
+      return func2;
+    }
+
+    if (func2 == null) {
+      return func1;
+    }
+
+    return new FunctionExprNode(functionName, ImmutableList.<ExprNode>of(func1, func2));
+  }
+
+  /**
+   * Helper method to create a {@link PojoRecordReader} for given arguments.
+   * @param tableType
+   * @param filter
+   * @param provider
+   * @param userSession
+   * @return
+   */
+  private static PojoRecordReader getPojoRecordReader(final InfoSchemaTableType tableType, final InfoSchemaFilter filter,
+      final SchemaTreeProvider provider, final UserSession userSession) {
+    final SchemaPlus rootSchema =
+        provider.createRootSchema(userSession.getCredentials().getUserName(), newSchemaConfigInfoProvider(userSession));
+    return tableType.getRecordReader(rootSchema, filter, userSession.getOptions());
+  }
+
+  /**
+   * Helper method to create a {@link SchemaConfigInfoProvider} instance for metadata purposes.
+   * @param session
+   * @return
+   */
+  private static SchemaConfigInfoProvider newSchemaConfigInfoProvider(final UserSession session) {
+    return new SchemaConfigInfoProvider() {
+      @Override
+      public ViewExpansionContext getViewExpansionContext() {
+        // Metadata APIs don't expect to expand the views.
+        throw new UnsupportedOperationException("View expansion context is not supported");
+      }
+
+      @Override
+      public OptionValue getOption(String optionKey) {
+        return session.getOptions().getOption(optionKey);
+      }
+    };
+  }
+
+  /**
+   * Helper method to create {@link DrillPBError} for client response message.
+   * @param failedFunction Brief description of the failed function.
+   * @param ex Exception thrown
+   * @return
+   */
+  private static DrillPBError createPBError(final String failedFunction, final Throwable ex) {
+    final String errorId = UUID.randomUUID().toString();
+    logger.error("Failed to {}. ErrorId: {}", failedFunction, errorId, ex);
+
+    final DrillPBError.Builder builder = DrillPBError.newBuilder();
+    builder.setErrorType(ErrorType.SYSTEM); // Metadata requests shouldn't cause any user errors
+    builder.setErrorId(errorId);
+    if (ex.getMessage() != null) {
+      builder.setMessage(ex.getMessage());
+    }
+
+    builder.setException(ErrorHelper.getWrapper(ex));
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 27126d3..cc614d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -21,16 +21,22 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
 import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
+import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.metadata.MetadataProvider;
 
 public class UserWorker{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
@@ -44,7 +50,6 @@ public class UserWorker{
   };
 
   public UserWorker(WorkerBee bee) {
-    super();
     this.bee = bee;
   }
 
@@ -52,7 +57,7 @@ public class UserWorker{
    * Helper method to generate QueryId
    * @return generated QueryId
    */
-  private QueryId queryIdGenerator() {
+  private static QueryId queryIdGenerator() {
     ThreadLocalRandom r = ThreadLocalRandom.current();
 
     // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence).  Last 12 bytes are random.
@@ -97,4 +102,20 @@ public class UserWorker{
     final QueryPlanFragments qPlanFragments = new PlanSplitter().planFragments(bee.getContext(), queryId, req, connection);
     return qPlanFragments;
   }
+
+  public void submitCatalogMetadataWork(UserSession session, GetCatalogsReq req, ResponseSender sender) {
+    bee.addNewWork(MetadataProvider.catalogs(session, bee.getContext(), req, sender));
+  }
+
+  public void submitSchemasMetadataWork(UserSession session, GetSchemasReq req, ResponseSender sender) {
+    bee.addNewWork(MetadataProvider.schemas(session, bee.getContext(), req, sender));
+  }
+
+  public void submitTablesMetadataWork(UserSession session, GetTablesReq req, ResponseSender sender) {
+    bee.addNewWork(MetadataProvider.tables(session, bee.getContext(), req, sender));
+  }
+
+  public void submitColumnsMetadataWork(UserSession session, GetColumnsReq req, ResponseSender sender) {
+    bee.addNewWork(MetadataProvider.columns(session, bee.getContext(), req, sender));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
new file mode 100644
index 0000000..bd2cdfb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.metadata;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserProtos.CatalogMetadata;
+import org.apache.drill.exec.proto.UserProtos.ColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
+import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
+import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
+import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
+import org.apache.drill.exec.proto.UserProtos.LikeFilter;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.SchemaMetadata;
+import org.apache.drill.exec.proto.UserProtos.TableMetadata;
+
+import org.junit.Test;
+
+/**
+ * Tests for metadata provider APIs.
+ */
+public class TestMetadataProvider extends BaseTestQuery {
+
+  @Test
+  public void catalogs() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS"); // SQL equivalent
+
+    GetCatalogsResp resp = client.getCatalogs(null).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<CatalogMetadata> catalogs = resp.getCatalogsList();
+    assertEquals(1, catalogs.size());
+
+    CatalogMetadata c = catalogs.get(0);
+    assertEquals(IS_CATALOG_NAME, c.getCatalogName());
+    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_CONNECT, c.getConnect());
+  }
+
+  @Test
+  public void catalogsWithFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS " +
+    //    "WHERE CATALOG_NAME LIKE '%DRI%' ESCAPE '\\'"); // SQL equivalent
+    GetCatalogsResp resp =
+        client.getCatalogs(LikeFilter.newBuilder().setRegex("%DRI%").setEscape("\\").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<CatalogMetadata> catalogs = resp.getCatalogsList();
+    assertEquals(1, catalogs.size());
+
+    CatalogMetadata c = catalogs.get(0);
+    assertEquals(IS_CATALOG_NAME, c.getCatalogName());
+    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_CONNECT, c.getConnect());
+  }
+
+  @Test
+  public void catalogsWithFilterNegative() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.CATALOGS
+    //     WHERE CATALOG_NAME LIKE '%DRIj\\\\hgjh%' ESCAPE '\\'"); // SQL equivalent
+
+    GetCatalogsResp resp =
+        client.getCatalogs(LikeFilter.newBuilder().setRegex("%DRIj\\%hgjh%").setEscape("\\").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<CatalogMetadata> catalogs = resp.getCatalogsList();
+    assertEquals(0, catalogs.size());
+  }
+
+  @Test
+  public void schemas() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA"); // SQL equivalent
+
+    GetSchemasResp resp = client.getSchemas(null, null).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<SchemaMetadata> schemas = resp.getSchemasList();
+    assertEquals(9, schemas.size());
+
+    verifySchema("INFORMATION_SCHEMA", schemas);
+    verifySchema("cp.default", schemas);
+    verifySchema("dfs.default", schemas);
+    verifySchema("dfs.root", schemas);
+    verifySchema("dfs.tmp", schemas);
+    verifySchema("dfs_test.default", schemas);
+    verifySchema("dfs_test.home", schemas);
+    verifySchema("dfs_test.tmp", schemas);
+    verifySchema("sys", schemas);
+  }
+
+  @Test
+  public void schemasWithSchemaNameFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+
+    GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setRegex("%y%").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<SchemaMetadata> schemas = resp.getSchemasList();
+    assertEquals(1, schemas.size());
+
+    verifySchema("sys", schemas);
+  }
+
+  @Test
+  public void schemasWithCatalogNameFilterAndSchemaNameFilter() throws Exception {
+
+    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA " +
+    //    "WHERE CATALOG_NAME LIKE '%RI%' AND SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+
+    GetSchemasResp resp = client.getSchemas(
+        LikeFilter.newBuilder().setRegex("%RI%").build(),
+        LikeFilter.newBuilder().setRegex("%dfs_test%").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<SchemaMetadata> schemas = resp.getSchemasList();
+    assertEquals(3, schemas.size());
+
+    verifySchema("dfs_test.default", schemas);
+    verifySchema("dfs_test.home", schemas);
+    verifySchema("dfs_test.tmp", schemas);
+  }
+
+  @Test
+  public void tables() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES`"); // SQL equivalent
+
+    GetTablesResp resp = client.getTables(null, null, null).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<TableMetadata> tables = resp.getTablesList();
+    assertEquals(11, tables.size());
+
+    verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
+    verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
+    verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
+    verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
+    verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+    verifyTable("sys", "boot", tables);
+    verifyTable("sys", "drillbits", tables);
+    verifyTable("sys", "memory", tables);
+    verifyTable("sys", "options", tables);
+    verifyTable("sys", "threads", tables);
+    verifyTable("sys", "version", tables);
+  }
+
+  @Test
+  public void tablesWithTableNameFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE '%o%'"); // SQL equivalent
+
+    GetTablesResp resp = client.getTables(null, null,
+        LikeFilter.newBuilder().setRegex("%o%").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<TableMetadata> tables = resp.getTablesList();
+    assertEquals(4, tables.size());
+
+    verifyTable("sys", "boot", tables);
+    verifyTable("sys", "memory", tables);
+    verifyTable("sys", "options", tables);
+    verifyTable("sys", "version", tables);
+  }
+
+  @Test
+  public void tablesWithTableNameFilterAndSchemaNameFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` " +
+    //    "WHERE TABLE_SCHEMA LIKE '%N\\_S%' ESCAPE '\\' AND TABLE_NAME LIKE '%o%'"); // SQL equivalent
+
+    GetTablesResp resp = client.getTables(null,
+        LikeFilter.newBuilder().setRegex("%N\\_S%").setEscape("\\").build(),
+        LikeFilter.newBuilder().setRegex("%o%").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<TableMetadata> tables = resp.getTablesList();
+    assertEquals(0, tables.size());
+  }
+
+  @Test
+  public void columns() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS"); // SQL equivalent
+
+    GetColumnsResp resp = client.getColumns(null, null, null, null).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<ColumnMetadata> columns = resp.getColumnsList();
+    assertEquals(70, columns.size());
+    // too many records to verify the output.
+  }
+
+  @Test
+  public void columnsWithColumnNameFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+    GetColumnsResp resp = client.getColumns(null, null, null,
+        LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<ColumnMetadata> columns = resp.getColumnsList();
+    assertEquals(5, columns.size());
+
+    verifyColumn("sys", "drillbits", "user_port", columns);
+    verifyColumn("sys", "drillbits", "control_port", columns);
+    verifyColumn("sys", "drillbits", "data_port", columns);
+    verifyColumn("sys", "memory", "user_port", columns);
+    verifyColumn("sys", "threads", "user_port", columns);
+  }
+
+  @Test
+  public void columnsWithColumnNameFilterAndTableNameFilter() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS
+    //     WHERE TABLE_NAME LIKE '%bits' AND COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+    GetColumnsResp resp = client.getColumns(null, null,
+        LikeFilter.newBuilder().setRegex("%bits").build(),
+        LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<ColumnMetadata> columns = resp.getColumnsList();
+    assertEquals(3, columns.size());
+
+    verifyColumn("sys", "drillbits", "user_port", columns);
+    verifyColumn("sys", "drillbits", "control_port", columns);
+    verifyColumn("sys", "drillbits", "data_port", columns);
+  }
+
+  @Test
+  public void columnsWithAllSupportedFilters() throws Exception {
+    // test("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
+    //    "TABLE_CATALOG LIKE '%ILL' AND TABLE_SCHEMA LIKE 'sys' AND " +
+    //    "TABLE_NAME LIKE '%bits' AND COLUMN_NAME LIKE '%\\_p%' ESCAPE '\\'"); // SQL equivalent
+
+    GetColumnsResp resp = client.getColumns(
+        LikeFilter.newBuilder().setRegex("%ILL").build(),
+        LikeFilter.newBuilder().setRegex("sys").build(),
+        LikeFilter.newBuilder().setRegex("%bits").build(),
+        LikeFilter.newBuilder().setRegex("%\\_p%").setEscape("\\").build()).get();
+
+    assertEquals(RequestStatus.OK, resp.getStatus());
+    List<ColumnMetadata> columns = resp.getColumnsList();
+    assertEquals(3, columns.size());
+
+    verifyColumn("sys", "drillbits", "user_port", columns);
+    verifyColumn("sys", "drillbits", "control_port", columns);
+    verifyColumn("sys", "drillbits", "data_port", columns);
+  }
+
+  /** Helper method to verify schema contents */
+  private static void verifySchema(String schemaName, List<SchemaMetadata> schemas) {
+    for(SchemaMetadata schema : schemas) {
+      if (schemaName.equals(schema.getSchemaName())) {
+        assertEquals(IS_CATALOG_NAME, schema.getCatalogName());
+        return;
+      }
+    }
+
+    fail("Failed to find schema '" + schemaName + "' in results: " + schemas);
+  }
+
+  /** Helper method to verify table contents */
+  private static void verifyTable(String schemaName, String tableName, List<TableMetadata> tables) {
+
+    for(TableMetadata table : tables) {
+      if (tableName.equals(table.getTableName()) && schemaName.equals(table.getSchemaName())) {
+        assertEquals(IS_CATALOG_NAME, table.getCatalogName());
+        return;
+      }
+    }
+
+    fail(String.format("Failed to find table '%s.%s' in results: %s", schemaName, tableName, tables));
+  }
+
+  /** Helper method to verify column contents */
+  private static void verifyColumn(String schemaName, String tableName, String columnName,
+      List<ColumnMetadata> columns) {
+
+    for(ColumnMetadata column : columns) {
+      if (schemaName.equals(column.getSchemaName()) && tableName.equals(column.getTableName()) &&
+          columnName.equals(column.getColumnName())) {
+        assertEquals(IS_CATALOG_NAME, column.getCatalogName());
+        return;
+      }
+    }
+
+    fail(String.format("Failed to find column '%s.%s.%s' in results: %s", schemaName, tableName, columnName, columns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ef6e522c/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index fe6e5cd..49dbb3e 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -442,7 +442,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
                   
                   </message>
-                  <maxsize>20000000</maxsize>
+                  <maxsize>21000000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>