You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2016/06/17 17:37:32 UTC

drill git commit: DRILL-4725: Improvements to InfoSchema RecordGenerator needed for DRILL-4714

Repository: drill
Updated Branches:
  refs/heads/master 7c7e9d171 -> f70df990f


DRILL-4725: Improvements to InfoSchema RecordGenerator needed for DRILL-4714

1. Add support for pushing the filter on following fields into InfoSchemaRecordGenerator:
   - CATALOG_NAME
   - COLUMN_NAME

2. Pushdown LIKE with ESCAPE. Add test TestInfoSchemaFilterPushDown#testFilterPushdown_LikeWithEscape

3. Add a method visitCatalog() to InfoSchemaRecordGenerator to decide whether to explore the catalog or not

4. Refactor CATALOG_DESCRIPTION and CATALOG_CONNECT as constant strings in InfoSchemaConstants.java

5. Update TestInfoSchemaFilterPushDown#testPartialFilterPushDownWithProject as
   we are now pushing the filter on COLUMN_NAME field

6. Cleanup:
   Rename RecordGenerator -> InfoSchemaRecordGenerator
   Add comments in RecordGenerator
   Rename SelectedTable -> InfoSchemaTableType

this closes #524

Change-Id: I0b2e16d04cb72fe3ce5961f5f357a00655f1cb05


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f70df990
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f70df990
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f70df990

Branch: refs/heads/master
Commit: f70df990f8d28f836f23e2a45145fc07c75f31bb
Parents: 7c7e9d1
Author: vkorukanti <ve...@dremio.com>
Authored: Thu Jun 9 00:00:31 2016 -0700
Committer: vkorukanti <ve...@dremio.com>
Committed: Fri Jun 17 10:34:49 2016 -0700

----------------------------------------------------------------------
 .../exec/store/ischema/InfoSchemaConstants.java |   6 +
 .../store/ischema/InfoSchemaDrillTable.java     |   4 +-
 .../exec/store/ischema/InfoSchemaFilter.java    |  19 +-
 .../store/ischema/InfoSchemaFilterBuilder.java  |  38 +-
 .../exec/store/ischema/InfoSchemaGroupScan.java |  10 +-
 .../ischema/InfoSchemaRecordGenerator.java      | 367 +++++++++++++++++++
 .../store/ischema/InfoSchemaStoragePlugin.java  |   4 +-
 .../exec/store/ischema/InfoSchemaSubScan.java   |   6 +-
 .../exec/store/ischema/InfoSchemaTable.java     |  22 +-
 .../exec/store/ischema/InfoSchemaTableType.java |  66 ++++
 .../exec/store/ischema/RecordGenerator.java     | 283 --------------
 .../drill/exec/store/ischema/SelectedTable.java |  65 ----
 .../apache/drill/exec/sql/TestInfoSchema.java   |  14 +
 .../ischema/TestInfoSchemaFilterPushDown.java   |  16 +-
 14 files changed, 529 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index 78c19c1..d73894a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -25,6 +25,12 @@ public final class InfoSchemaConstants {
   /** Name of catalog containing information schema. */
   public static final String IS_CATALOG_NAME = "DRILL";
 
+  /** Catalog description */
+  public static final String IS_CATALOG_DESCR = "The internal metadata used by Drill";
+
+  /** Catalog connect string. Currently empty */
+  public static final String IS_CATALOG_CONNECT = "";
+
   /** Name of information schema. */
   public static final String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
index 4232940..9c03bcf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
@@ -25,9 +25,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 public class InfoSchemaDrillTable extends DrillTable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaDrillTable.class);
 
-  private final SelectedTable table;
+  private final InfoSchemaTableType table;
 
-  public InfoSchemaDrillTable(InfoSchemaStoragePlugin plugin, String storageEngineName, SelectedTable selection, StoragePluginConfig storageEngineConfig) {
+  public InfoSchemaDrillTable(InfoSchemaStoragePlugin plugin, String storageEngineName, InfoSchemaTableType selection, StoragePluginConfig storageEngineConfig) {
     super(storageEngineName, plugin, selection);
     this.table = selection;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
index 537ea19..4197a26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.ischema;
 
+import static org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -144,11 +146,18 @@ public class InfoSchemaFilter {
   private Result evaluateHelperFunction(Map<String, String> recordValues, FunctionExprNode exprNode) {
     switch(exprNode.function) {
       case "like": {
-        FieldExprNode arg0 = (FieldExprNode) exprNode.args.get(0);
-        ConstantExprNode arg1 = (ConstantExprNode) exprNode.args.get(1);
-        if (recordValues.get(arg0.field.toString()) != null) {
-          return Pattern.matches(RegexpUtil.sqlToRegexLike(arg1.value), recordValues.get(arg0.field.toString())) ?
-              Result.TRUE : Result.FALSE;
+        FieldExprNode col = (FieldExprNode) exprNode.args.get(0);
+        ConstantExprNode pattern = (ConstantExprNode) exprNode.args.get(1);
+        ConstantExprNode escape = exprNode.args.size() > 2 ? (ConstantExprNode) exprNode.args.get(2) : null;
+        final String fieldValue = recordValues.get(col.field.toString());
+        if (fieldValue != null) {
+          if (escape == null) {
+            return Pattern.matches(sqlToRegexLike(pattern.value), fieldValue) ?
+                Result.TRUE : Result.FALSE;
+          } else {
+            return Pattern.matches(sqlToRegexLike(pattern.value, escape.value), fieldValue) ?
+                Result.TRUE : Result.FALSE;
+          }
         }
 
         return Result.INCONCLUSIVE;

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
index 9d0bc06..22fb48c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
@@ -37,7 +37,7 @@ import java.util.List;
 
 /**
  * Builds a InfoSchemaFilter out of the Filter condition. Currently we look only for certain conditions. Mainly
- * conditions involving columns "TABLE_NAME", "SCHEMA_NAME" and "TABLE_SCHEMA" and
+ * conditions involving columns "CATALOG_NAME, "TABLE_NAME", "SCHEMA_NAME", "TABLE_SCHEMA" and "COLUMN_NAME", and
  * functions EQUAL, NOT EQUAL, LIKE, OR and AND.
  */
 public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void, RuntimeException> {
@@ -69,13 +69,25 @@ public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void,
       case "equal":
       case "not equal":
       case "notequal":
-      case "not_equal":
-      case "like": {
-        ExprNode arg0 = call.args.get(0).accept(this, value);
-        ExprNode arg1 = call.args.get(1).accept(this, value);
+      case "not_equal": {
+        final ExprNode col = call.args.get(0).accept(this, value);
+        final ExprNode constant = call.args.get(1).accept(this, value);
+
+        if (col instanceof FieldExprNode && constant instanceof ConstantExprNode) {
+          return new FunctionExprNode(funcName, ImmutableList.of(col, constant));
+        }
+        break;
+      }
 
-        if (arg0 != null && arg0 instanceof FieldExprNode && arg1 != null && arg1 instanceof ConstantExprNode) {
-          return new FunctionExprNode(funcName, ImmutableList.of(arg0, arg1));
+      case "like": {
+        final ExprNode col = call.args.get(0).accept(this, value);
+        final ExprNode pattern = call.args.get(1).accept(this, value);
+        final ExprNode escape = call.args.size() > 2 ? call.args.get(2).accept(this, value) : null;
+
+        if (col instanceof FieldExprNode && pattern instanceof ConstantExprNode &&
+            (escape == null || escape instanceof ConstantExprNode)) {
+          return new FunctionExprNode(funcName,
+              escape == null ? ImmutableList.of(col, pattern) : ImmutableList.of(col, pattern, escape));
         }
         break;
       }
@@ -127,9 +139,11 @@ public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void,
     if (e.getInput() instanceof FieldReference) {
       FieldReference fieldRef = (FieldReference) e.getInput();
       String field = fieldRef.getAsUnescapedPath().toUpperCase();
-      if (field.equals(SCHS_COL_SCHEMA_NAME)
+      if (field.equals(CATS_COL_CATALOG_NAME)
+          || field.equals(SCHS_COL_SCHEMA_NAME)
           || field.equals(SHRD_COL_TABLE_NAME)
-          || field.equals(SHRD_COL_TABLE_SCHEMA)) {
+          || field.equals(SHRD_COL_TABLE_SCHEMA)
+          || field.equals(COLS_COL_COLUMN_NAME)) {
         return new FieldExprNode(field);
       }
     }
@@ -145,9 +159,11 @@ public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void,
   @Override
   public ExprNode visitSchemaPath(SchemaPath path, Void value) throws RuntimeException {
     String field = path.getAsUnescapedPath().toUpperCase();
-    if (field.equals(SCHS_COL_SCHEMA_NAME)
+    if (field.equals(CATS_COL_CATALOG_NAME)
+        || field.equals(SCHS_COL_SCHEMA_NAME)
         || field.equals(SHRD_COL_TABLE_NAME)
-        || field.equals(SHRD_COL_TABLE_SCHEMA)) {
+        || field.equals(SHRD_COL_TABLE_SCHEMA)
+        || field.equals(COLS_COL_COLUMN_NAME)) {
       return new FieldExprNode(field);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index bd0d582..698004f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -17,13 +17,11 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.util.Collections;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -42,17 +40,17 @@ import com.google.common.base.Preconditions;
 public class InfoSchemaGroupScan extends AbstractGroupScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaGroupScan.class);
 
-  private final SelectedTable table;
+  private final InfoSchemaTableType table;
   private final InfoSchemaFilter filter;
 
   private boolean isFilterPushedDown = false;
 
-  public InfoSchemaGroupScan(SelectedTable table) {
+  public InfoSchemaGroupScan(InfoSchemaTableType table) {
     this(table, null);
   }
 
   @JsonCreator
-  public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table,
+  public InfoSchemaGroupScan(@JsonProperty("table") InfoSchemaTableType table,
                              @JsonProperty("filter") InfoSchemaFilter filter) {
     super((String)null);
     this.table = table;
@@ -67,7 +65,7 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
   }
 
   @JsonProperty("table")
-  public SelectedTable getTable() {
+  public InfoSchemaTableType getTable() {
     return table;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
new file mode 100644
index 0000000..5223595
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * Generates records for POJO RecordReader by scanning the given schema. At every level (catalog, schema, table, field),
+ * level specific object is visited and decision is taken to visit the contents of the object. Object here is catalog,
+ * schema, table or field.
+ */
+public abstract class InfoSchemaRecordGenerator {
+  protected InfoSchemaFilter filter;
+
+  protected OptionManager optionManager;
+  public InfoSchemaRecordGenerator(OptionManager optionManager) {
+    this.optionManager = optionManager;
+  }
+
+  public void setInfoSchemaFilter(InfoSchemaFilter filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * Visit the catalog. Drill has only one catalog.
+   *
+   * @return Whether to continue exploring the contents of the catalog or not. Contents are schema/schema tree.
+   */
+  public boolean visitCatalog() {
+    return true;
+  }
+
+  /**
+   * Visit the given schema.
+   *
+   * @param schemaName Name of the schema
+   * @param schema Schema object
+   * @return Whether to continue exploring the contents of the schema or not. Contents are tables within the schema.
+   */
+  public boolean visitSchema(String schemaName, SchemaPlus schema) {
+    return true;
+  }
+
+  /**
+   * Visit the given table.
+   *
+   * @param schemaName Name of the schema where the table is present
+   * @param tableName Name of the table
+   * @param table Table object
+   * @return Whether to continue exploring the contents of the table or not. Contents are fields within the table.
+   */
+  public boolean visitTable(String schemaName, String tableName, Table table) {
+    return true;
+  }
+
+  /**
+   * Visit the given field.
+   *
+   * @param schemaName Schema where the table of the field is present
+   * @param tableName Table name
+   * @param field Field object
+   */
+  public void visitField(String schemaName, String tableName, RelDataTypeField field) {
+  }
+
+  protected boolean shouldVisitCatalog() {
+    if (filter == null) {
+      return true;
+    }
+
+    final Map<String, String> recordValues = ImmutableMap.of(CATS_COL_CATALOG_NAME, IS_CATALOG_NAME);
+
+    // If the filter evaluates to false then we don't need to visit the catalog.
+    // For other two results (TRUE, INCONCLUSIVE) continue to visit the catalog.
+    return filter.evaluate(recordValues) != Result.FALSE;
+  }
+
+  protected boolean shouldVisitSchema(String schemaName, SchemaPlus schema) {
+    try {
+      // if the schema path is null or empty (try for root schema)
+      if (schemaName == null || schemaName.isEmpty()) {
+        return false;
+      }
+
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      if (!drillSchema.showInInformationSchema()) {
+        return false;
+      }
+
+      if (filter == null) {
+        return true;
+      }
+
+      final Map<String, String> recordValues =
+          ImmutableMap.of(
+              CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+              SHRD_COL_TABLE_SCHEMA, schemaName,
+              SCHS_COL_SCHEMA_NAME, schemaName);
+
+      // If the filter evaluates to false then we don't need to visit the schema.
+      // For other two results (TRUE, INCONCLUSIVE) continue to visit the schema.
+      return filter.evaluate(recordValues) != Result.FALSE;
+    } catch(ClassCastException e) {
+      // ignore and return true as this is not a Drill schema
+    }
+    return true;
+  }
+
+  protected boolean shouldVisitTable(String schemaName, String tableName) {
+    if (filter == null) {
+      return true;
+    }
+
+    final Map<String, String> recordValues =
+        ImmutableMap.of(
+            CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+            SHRD_COL_TABLE_SCHEMA, schemaName,
+            SCHS_COL_SCHEMA_NAME, schemaName,
+            SHRD_COL_TABLE_NAME, tableName);
+
+    // If the filter evaluates to false then we don't need to visit the table.
+    // For other two results (TRUE, INCONCLUSIVE) continue to visit the table.
+    return filter.evaluate(recordValues) != Result.FALSE;
+  }
+
+  protected boolean shouldVisitColumn(String schemaName, String tableName, String columnName) {
+    if (filter == null) {
+      return true;
+    }
+
+    final Map<String, String> recordValues =
+        ImmutableMap.of(
+            CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+            SHRD_COL_TABLE_SCHEMA, schemaName,
+            SCHS_COL_SCHEMA_NAME, schemaName,
+            SHRD_COL_TABLE_NAME, tableName,
+            COLS_COL_COLUMN_NAME, columnName);
+
+    // If the filter evaluates to false then we don't need to visit the column.
+    // For other two results (TRUE, INCONCLUSIVE) continue to visit the column.
+    return filter.evaluate(recordValues) != Result.FALSE;
+  }
+
+  public abstract PojoRecordReader<?> getRecordReader();
+
+  public void scanSchema(SchemaPlus root) {
+    if (shouldVisitCatalog() && visitCatalog()) {
+      scanSchema(root.getName(), root);
+    }
+  }
+
+  /**
+   * Recursively scans the given schema, invoking the visitor as appropriate.
+   * @param  schemaPath  the path to the given schema, so far
+   * @param  schema  the given schema
+   */
+  private void scanSchema(String schemaPath, SchemaPlus schema) {
+
+    // Recursively scan any subschema.
+    for (String name: schema.getSubSchemaNames()) {
+      scanSchema(schemaPath +
+          (schemaPath == "" ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
+          name, schema.getSubSchema(name));
+    }
+
+    // Visit this schema and if requested ...
+    if (shouldVisitSchema(schemaPath, schema) && visitSchema(schemaPath, schema)) {
+      visitTables(schemaPath, schema);
+    }
+  }
+
+  /**
+   * Visit the tables in the given schema. The
+   * @param  schemaPath  the path to the given schema
+   * @param  schema  the given schema
+   */
+  public void visitTables(String schemaPath, SchemaPlus schema) {
+    final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+    final List<String> tableNames = Lists.newArrayList(schema.getTableNames());
+    for(Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(tableNames)) {
+      final String tableName = tableNameToTable.getKey();
+      final Table table = tableNameToTable.getValue();
+      // Visit the table, and if requested ...
+      if(shouldVisitTable(schemaPath, tableName) && visitTable(schemaPath,  tableName, table)) {
+        // ... do for each of the table's fields.
+        final RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl());
+        for (RelDataTypeField field: tableRow.getFieldList()) {
+          if (shouldVisitColumn(schemaPath, tableName, field.getName())) {
+            visitField(schemaPath, tableName, field);
+          }
+        }
+      }
+    }
+  }
+
+  public static class Catalogs extends InfoSchemaRecordGenerator {
+    List<Records.Catalog> records = ImmutableList.of();
+
+    public Catalogs(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.Catalog> getRecordReader() {
+      return new PojoRecordReader<>(Records.Catalog.class, records.iterator());
+    }
+
+    @Override
+    public boolean visitCatalog() {
+      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCR, IS_CATALOG_CONNECT));
+      return false;
+    }
+  }
+
+  public static class Schemata extends InfoSchemaRecordGenerator {
+    List<Records.Schema> records = Lists.newArrayList();
+
+    public Schemata(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.Schema> getRecordReader() {
+      return new PojoRecordReader<>(Records.Schema.class, records.iterator());
+    }
+
+    @Override
+    public boolean visitSchema(String schemaName, SchemaPlus schema) {
+      AbstractSchema as = schema.unwrap(AbstractSchema.class);
+      records.add(new Records.Schema(IS_CATALOG_NAME, schemaName, "<owner>",
+                                     as.getTypeName(), as.isMutable()));
+      return false;
+    }
+  }
+
+  public static class Tables extends InfoSchemaRecordGenerator {
+    List<Records.Table> records = Lists.newArrayList();
+
+    public Tables(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.Table> getRecordReader() {
+      return new PojoRecordReader<>(Records.Table.class, records.iterator());
+    }
+
+    @Override
+    public void visitTables(String schemaPath, SchemaPlus schema) {
+      final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+
+      final List<String> tableNames = Lists.newArrayList(schema.getTableNames());
+      final List<Pair<String, ? extends Table>> tableNameToTables;
+      if(optionManager.getOption(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST)) {
+        tableNameToTables = drillSchema.getTablesByNamesByBulkLoad(tableNames);
+      } else {
+        tableNameToTables = drillSchema.getTablesByNames(tableNames);
+      }
+
+      for(Pair<String, ? extends Table> tableNameToTable : tableNameToTables) {
+        final String tableName = tableNameToTable.getKey();
+        final Table table = tableNameToTable.getValue();
+        // Visit the table, and if requested ...
+        if(shouldVisitTable(schemaPath, tableName)) {
+          visitTable(schemaPath, tableName, table);
+        }
+      }
+    }
+
+    @Override
+    public boolean visitTable(String schemaName, String tableName, Table table) {
+      Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName);
+
+      // skip over unknown table types
+      if (table.getJdbcTableType() != null) {
+        records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
+            table.getJdbcTableType().toString()));
+      }
+
+      return false;
+    }
+  }
+
+  public static class Views extends InfoSchemaRecordGenerator {
+    List<Records.View> records = Lists.newArrayList();
+
+    public Views(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.View> getRecordReader() {
+      return new PojoRecordReader<>(Records.View.class, records.iterator());
+    }
+
+    @Override
+    public boolean visitTable(String schemaName, String tableName, Table table) {
+      if (table.getJdbcTableType() == TableType.VIEW) {
+        records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName,
+                    ((DrillViewInfoProvider) table).getViewSql()));
+      }
+      return false;
+    }
+  }
+
+  public static class Columns extends InfoSchemaRecordGenerator {
+    List<Records.Column> records = Lists.newArrayList();
+    public Columns(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.Column> getRecordReader() {
+      return new PojoRecordReader<>(Records.Column.class, records.iterator());
+    }
+
+    @Override
+    public void visitField(String schemaName, String tableName, RelDataTypeField field) {
+      records.add(new Records.Column(IS_CATALOG_NAME, schemaName, tableName, field));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 27648c5..e59899f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -62,7 +62,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
   @Override
   public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
       throws IOException {
-    SelectedTable table = selection.getWith(context.getLpPersistence(),  SelectedTable.class);
+    InfoSchemaTableType table = selection.getWith(context.getLpPersistence(),  InfoSchemaTableType.class);
     return new InfoSchemaGroupScan(table);
   }
 
@@ -85,7 +85,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
     public ISchema(SchemaPlus parent, InfoSchemaStoragePlugin plugin){
       super(ImmutableList.<String>of(), IS_SCHEMA_NAME);
       Map<String, InfoSchemaDrillTable> tbls = Maps.newHashMap();
-      for(SelectedTable tbl : SelectedTable.values()){
+      for(InfoSchemaTableType tbl : InfoSchemaTableType.values()){
         tbls.put(tbl.name(), new InfoSchemaDrillTable(plugin, IS_SCHEMA_NAME, tbl, config));
       }
       this.tables = ImmutableMap.copyOf(tbls);

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index 7a479d9..116a026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 public class InfoSchemaSubScan extends AbstractSubScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaSubScan.class);
 
-  private final SelectedTable table;
+  private final InfoSchemaTableType table;
   private final InfoSchemaFilter filter;
 
   @JsonCreator
-  public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table,
+  public InfoSchemaSubScan(@JsonProperty("table") InfoSchemaTableType table,
                            @JsonProperty("filter") InfoSchemaFilter filter) {
     super(null);
     this.table = table;
@@ -38,7 +38,7 @@ public class InfoSchemaSubScan extends AbstractSubScan{
   }
 
   @JsonProperty("table")
-  public SelectedTable getTable() {
+  public InfoSchemaTableType getTable() {
     return table;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
index a6d56b3..eb66bc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
@@ -86,7 +86,7 @@ public abstract class InfoSchemaTable {
     return typeFactory.createStructType(relTypes, fieldNames);
   }
 
-  public abstract RecordGenerator getRecordGenerator(OptionManager optionManager);
+  public abstract InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager);
 
   /** Layout for the CATALOGS table. */
   static public class Catalogs extends InfoSchemaTable {
@@ -102,8 +102,8 @@ public abstract class InfoSchemaTable {
     }
 
     @Override
-    public RecordGenerator getRecordGenerator(OptionManager optionManager) {
-      return new RecordGenerator.Catalogs(optionManager);
+    public InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Catalogs(optionManager);
     }
   }
 
@@ -123,8 +123,8 @@ public abstract class InfoSchemaTable {
     }
 
     @Override
-    public RecordGenerator getRecordGenerator(OptionManager optionManager) {
-      return new RecordGenerator.Schemata(optionManager);
+    public InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Schemata(optionManager);
     }
   }
 
@@ -143,8 +143,8 @@ public abstract class InfoSchemaTable {
     }
 
     @Override
-    public RecordGenerator getRecordGenerator(OptionManager optionManager) {
-      return new RecordGenerator.Tables(optionManager);
+    public InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Tables(optionManager);
     }
   }
 
@@ -163,8 +163,8 @@ public abstract class InfoSchemaTable {
     }
 
     @Override
-    public RecordGenerator getRecordGenerator(OptionManager optionManager) {
-      return new RecordGenerator.Views(optionManager);
+    public InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Views(optionManager);
     }
   }
 
@@ -215,8 +215,8 @@ public abstract class InfoSchemaTable {
     }
 
     @Override
-    public RecordGenerator getRecordGenerator(OptionManager optionManager) {
-      return new RecordGenerator.Columns(optionManager);
+    public InfoSchemaRecordGenerator getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Columns(optionManager);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
new file mode 100644
index 0000000..ec914b2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import org.apache.calcite.schema.SchemaPlus;
+
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Catalogs;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Columns;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Schemata;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Tables;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Views;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+/**
+ * The set of tables/views in INFORMATION_SCHEMA.
+ */
+public enum InfoSchemaTableType {
+  // TODO:  Resolve how to not have two different place defining table names:
+  // NOTE: These identifiers have to match the string values in
+  // InfoSchemaConstants.
+  CATALOGS(new Catalogs()),
+  SCHEMATA(new Schemata()),
+  VIEWS(new Views()),
+  COLUMNS(new Columns()),
+  TABLES(new Tables());
+
+  private final InfoSchemaTable tableDef;
+
+  /**
+   * ...
+   * @param  tableDef  the definition (columns and data generator) of the table
+   */
+  InfoSchemaTableType(InfoSchemaTable tableDef) {
+    this.tableDef = tableDef;
+  }
+
+  public PojoRecordReader<?> getRecordReader(SchemaPlus rootSchema, InfoSchemaFilter filter, OptionManager optionManager) {
+    InfoSchemaRecordGenerator recordGenerator = tableDef.getRecordGenerator(optionManager);
+    recordGenerator.setInfoSchemaFilter(filter);
+    recordGenerator.scanSchema(rootSchema);
+    return recordGenerator.getRecordReader();
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return tableDef.getRowType(typeFactory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
deleted file mode 100644
index 29ccbce..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ischema;
-
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.schema.Schema.TableType;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
-import org.apache.drill.exec.store.pojo.PojoRecordReader;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-/**
- * Generates records for POJO RecordReader by scanning the given schema.
- */
-public abstract class RecordGenerator {
-  protected InfoSchemaFilter filter;
-
-  protected OptionManager optionManager;
-  public RecordGenerator(OptionManager optionManager) {
-    this.optionManager = optionManager;
-  }
-
-  public void setInfoSchemaFilter(InfoSchemaFilter filter) {
-    this.filter = filter;
-  }
-
-  public boolean visitSchema(String schemaName, SchemaPlus schema) {
-    return true;
-  }
-
-  public boolean visitTable(String schemaName, String tableName, Table table) {
-    return true;
-  }
-
-  public boolean visitField(String schemaName, String tableName, RelDataTypeField field) {
-    return true;
-  }
-
-  protected boolean shouldVisitSchema(String schemaName, SchemaPlus schema) {
-    try {
-      // if the schema path is null or empty (try for root schema)
-      if (schemaName == null || schemaName.isEmpty()) {
-        return false;
-      }
-
-      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-      if (!drillSchema.showInInformationSchema()) {
-        return false;
-      }
-
-      Map<String, String> recordValues =
-          ImmutableMap.of(SHRD_COL_TABLE_SCHEMA, schemaName,
-                          SCHS_COL_SCHEMA_NAME, schemaName);
-      if (filter != null && filter.evaluate(recordValues) == Result.FALSE) {
-        // If the filter evaluates to false then we don't need to visit the schema.
-        // For other two results (TRUE, INCONCLUSIVE) continue to visit the schema.
-        return false;
-      }
-    } catch(ClassCastException e) {
-      // ignore and return true as this is not a Drill schema
-    }
-    return true;
-  }
-
-  protected boolean shouldVisitTable(String schemaName, String tableName) {
-    Map<String, String> recordValues =
-        ImmutableMap.of( SHRD_COL_TABLE_SCHEMA, schemaName,
-                         SCHS_COL_SCHEMA_NAME, schemaName,
-                         SHRD_COL_TABLE_NAME, tableName);
-    if (filter != null && filter.evaluate(recordValues) == Result.FALSE) {
-      return false;
-    }
-
-    return true;
-  }
-
-  public abstract RecordReader getRecordReader();
-
-  public void scanSchema(SchemaPlus root) {
-    scanSchema(root.getName(), root);
-  }
-
-  /**
-   * Recursively scans the given schema, invoking the visitor as appropriate.
-   * @param  schemaPath  the path to the given schema, so far
-   * @param  schema  the given schema
-   */
-  private void scanSchema(String schemaPath, SchemaPlus schema) {
-
-    // Recursively scan any subschema.
-    for (String name: schema.getSubSchemaNames()) {
-      scanSchema(schemaPath +
-          (schemaPath == "" ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
-          name, schema.getSubSchema(name));
-    }
-
-    // Visit this schema and if requested ...
-    if (shouldVisitSchema(schemaPath, schema) && visitSchema(schemaPath, schema)) {
-      visitTables(schemaPath, schema);
-    }
-  }
-
-  /**
-   * Visit the tables in the given schema. The
-   * @param  schemaPath  the path to the given schema
-   * @param  schema  the given schema
-   */
-  public void visitTables(String schemaPath, SchemaPlus schema) {
-    final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-    final List<String> tableNames = Lists.newArrayList(schema.getTableNames());
-    for(Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(tableNames)) {
-      final String tableName = tableNameToTable.getKey();
-      final Table table = tableNameToTable.getValue();
-      // Visit the table, and if requested ...
-      if(shouldVisitTable(schemaPath, tableName) && visitTable(schemaPath,  tableName, table)) {
-        // ... do for each of the table's fields.
-        final RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl());
-        for (RelDataTypeField field: tableRow.getFieldList()) {
-          visitField(schemaPath, tableName, field);
-        }
-      }
-    }
-  }
-
-  public static class Catalogs extends RecordGenerator {
-    public Catalogs(OptionManager optionManager) {
-      super(optionManager);
-    }
-
-    @Override
-    public RecordReader getRecordReader() {
-      Records.Catalog catalogRecord =
-          new Records.Catalog(IS_CATALOG_NAME,
-                              "The internal metadata used by Drill", "");
-      return new PojoRecordReader<>(Records.Catalog.class, ImmutableList.of(catalogRecord).iterator());
-    }
-  }
-
-  public static class Schemata extends RecordGenerator {
-    List<Records.Schema> records = Lists.newArrayList();
-
-    public Schemata(OptionManager optionManager) {
-      super(optionManager);
-    }
-
-    @Override
-    public RecordReader getRecordReader() {
-      return new PojoRecordReader<>(Records.Schema.class, records.iterator());
-    }
-
-    @Override
-    public boolean visitSchema(String schemaName, SchemaPlus schema) {
-      AbstractSchema as = schema.unwrap(AbstractSchema.class);
-      records.add(new Records.Schema(IS_CATALOG_NAME, schemaName, "<owner>",
-                                     as.getTypeName(), as.isMutable()));
-      return false;
-    }
-  }
-
-  public static class Tables extends RecordGenerator {
-    List<Records.Table> records = Lists.newArrayList();
-
-    public Tables(OptionManager optionManager) {
-      super(optionManager);
-    }
-
-    @Override
-    public RecordReader getRecordReader() {
-      return new PojoRecordReader<>(Records.Table.class, records.iterator());
-    }
-
-    @Override
-    public void visitTables(String schemaPath, SchemaPlus schema) {
-      final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-
-      final List<String> tableNames = Lists.newArrayList(schema.getTableNames());
-      final List<Pair<String, ? extends Table>> tableNameToTables;
-      if(optionManager.getOption(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST)) {
-        tableNameToTables = drillSchema.getTablesByNamesByBulkLoad(tableNames);
-      } else {
-        tableNameToTables = drillSchema.getTablesByNames(tableNames);
-      }
-
-      for(Pair<String, ? extends Table> tableNameToTable : tableNameToTables) {
-        final String tableName = tableNameToTable.getKey();
-        final Table table = tableNameToTable.getValue();
-        // Visit the table, and if requested ...
-        if(shouldVisitTable(schemaPath, tableName)) {
-          visitTable(schemaPath, tableName, table);
-        }
-      }
-    }
-
-    @Override
-    public boolean visitTable(String schemaName, String tableName, Table table) {
-      Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName);
-
-      // skip over unknown table types
-      if (table.getJdbcTableType() != null) {
-        records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
-            table.getJdbcTableType().toString()));
-      }
-
-      return false;
-    }
-  }
-
-  public static class Views extends RecordGenerator {
-    List<Records.View> records = Lists.newArrayList();
-
-    public Views(OptionManager optionManager) {
-      super(optionManager);
-    }
-
-    @Override
-    public RecordReader getRecordReader() {
-      return new PojoRecordReader<>(Records.View.class, records.iterator());
-    }
-
-    @Override
-    public boolean visitTable(String schemaName, String tableName, Table table) {
-      if (table.getJdbcTableType() == TableType.VIEW) {
-        records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName,
-                    ((DrillViewInfoProvider) table).getViewSql()));
-      }
-      return false;
-    }
-  }
-
-  public static class Columns extends RecordGenerator {
-    List<Records.Column> records = Lists.newArrayList();
-    public Columns(OptionManager optionManager) {
-      super(optionManager);
-    }
-
-    @Override
-    public RecordReader getRecordReader() {
-      return new PojoRecordReader<>(Records.Column.class, records.iterator());
-    }
-
-    @Override
-    public boolean visitField(String schemaName, String tableName, RelDataTypeField field) {
-      records.add(new Records.Column(IS_CATALOG_NAME, schemaName, tableName, field));
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/SelectedTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/SelectedTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/SelectedTable.java
deleted file mode 100644
index e2a2b2f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/SelectedTable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ischema;
-
-import org.apache.calcite.schema.SchemaPlus;
-
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.ischema.InfoSchemaTable.Catalogs;
-import org.apache.drill.exec.store.ischema.InfoSchemaTable.Columns;
-import org.apache.drill.exec.store.ischema.InfoSchemaTable.Schemata;
-import org.apache.drill.exec.store.ischema.InfoSchemaTable.Tables;
-import org.apache.drill.exec.store.ischema.InfoSchemaTable.Views;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-
-/**
- * The set of tables/views in INFORMATION_SCHEMA.
- */
-public enum SelectedTable{
-  // TODO:  Resolve how to not have two different place defining table names:
-  // NOTE: These identifiers have to match the string values in
-  // InfoSchemaConstants.
-  CATALOGS(new Catalogs()),
-  SCHEMATA(new Schemata()),
-  VIEWS(new Views()),
-  COLUMNS(new Columns()),
-  TABLES(new Tables());
-
-  private final InfoSchemaTable tableDef;
-
-  /**
-   * ...
-   * @param  tableDef  the definition (columns and data generator) of the table
-   */
-  private SelectedTable(InfoSchemaTable tableDef) {
-    this.tableDef = tableDef;
-  }
-
-  public RecordReader getRecordReader(SchemaPlus rootSchema, InfoSchemaFilter filter, OptionManager optionManager) {
-    RecordGenerator recordGenerator = tableDef.getRecordGenerator(optionManager);
-    recordGenerator.setInfoSchemaFilter(filter);
-    recordGenerator.scanSchema(rootSchema);
-    return recordGenerator.getRecordReader();
-  }
-
-  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    return tableDef.getRowType(typeFactory);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 8e7498f..15d98d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.sql;
 
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.TestBuilder;
@@ -42,6 +46,16 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
+  public void catalogs() throws Exception {
+    testBuilder()
+        .sqlQuery("SELECT * FROM INFORMATION_SCHEMA.CATALOGS")
+        .unOrdered()
+        .baselineColumns(CATS_COL_CATALOG_NAME, CATS_COL_CATALOG_DESCRIPTION, CATS_COL_CATALOG_CONNECT)
+        .baselineValues("DRILL", "The internal metadata used by Drill", "")
+        .go();
+  }
+
+  @Test
   public void showTablesFromDb() throws Exception{
     final List<String[]> expected =
         ImmutableList.of(

http://git-wip-us.apache.org/repos/asf/drill/blob/f70df990/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestInfoSchemaFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestInfoSchemaFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestInfoSchemaFilterPushDown.java
index f0d216b..7c66c7b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestInfoSchemaFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestInfoSchemaFilterPushDown.java
@@ -50,6 +50,14 @@ public class TestInfoSchemaFilterPushDown extends PlanTestBase {
   }
 
   @Test
+  public void testFilterPushdown_LikeWithEscape() throws Exception {
+    final String query = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA LIKE '%\\\\SCH%' ESCAPE '\\'";
+    final String scan = "Scan(groupscan=[TABLES, filter=like(Field=TABLE_SCHEMA,Literal=%\\\\SCH%,Literal=\\)])";
+
+    testHelper(query, scan, false);
+  }
+
+  @Test
   public void testFilterPushdown_And() throws Exception {
     final String query = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
         "TABLE_SCHEMA = 'sys' AND " +
@@ -98,9 +106,11 @@ public class TestInfoSchemaFilterPushDown extends PlanTestBase {
     final String query = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
         "TABLE_SCHEMA = 'sys' AND " +
         "TABLE_NAME = 'version' AND " +
-        "COLUMN_NAME like 'commit%s'";
-    final String scan = "Scan(groupscan=[COLUMNS, filter=booleanand(equal(Field=TABLE_SCHEMA,Literal=sys)," +
-        "equal(Field=TABLE_NAME,Literal=version))])";
+        "COLUMN_NAME like 'commit%s' AND " +
+        "IS_NULLABLE = 'YES'"; // this is not expected to pushdown into scan
+    final String scan = "Scan(groupscan=[COLUMNS, " +
+        "filter=booleanand(equal(Field=TABLE_SCHEMA,Literal=sys),equal(Field=TABLE_NAME,Literal=version)," +
+        "like(Field=COLUMN_NAME,Literal=commit%s))]";
 
     testHelper(query, scan, true);
   }