You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/27 08:25:54 UTC

[GitHub] asfgit closed pull request #1439: DRILL-6492: Ensure schema / workspace case insensitivity in Drill

asfgit closed pull request #1439: DRILL-6492: Ensure schema / workspace case insensitivity in Drill
URL: https://github.com/apache/drill/pull/1439
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java b/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
index 20e46dd8f87..a9f18d31504 100644
--- a/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
+++ b/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
@@ -22,6 +22,7 @@
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -41,7 +42,7 @@
    * @return key case-insensitive concurrent map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newConcurrentMap() {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newConcurrentMap());
+    return new CaseInsensitiveMap<>(Maps.newConcurrentMap());
   }
 
   /**
@@ -51,7 +52,7 @@
    * @return key case-insensitive hash map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newHashMap() {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newHashMap());
+    return new CaseInsensitiveMap<>(Maps.newHashMap());
   }
 
   /**
@@ -63,7 +64,7 @@
    * @return key case-insensitive hash map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newHashMapWithExpectedSize(final int expectedSize) {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newHashMapWithExpectedSize(expectedSize));
+    return new CaseInsensitiveMap<>(Maps.newHashMapWithExpectedSize(expectedSize));
   }
 
   /**
@@ -154,4 +155,22 @@ public void clear() {
   public Set<Entry<String, VALUE>> entrySet() {
     return underlyingMap.entrySet();
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(underlyingMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof CaseInsensitiveMap)) {
+      return false;
+    }
+    CaseInsensitiveMap<?> that = (CaseInsensitiveMap<?>) o;
+    return Objects.equals(underlyingMap, that.underlyingMap);
+  }
+
 }
diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
index 909e8110df1..552c0732918 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
@@ -133,12 +133,9 @@ public void scan(final Object cls) {
      * @return the class names of all children direct or indirect
      */
     public Set<ChildClassDescriptor> getChildrenOf(String name) {
-      Set<ChildClassDescriptor> result = new HashSet<>();
       Collection<ChildClassDescriptor> scannedChildren = children.get(name);
       // add all scanned children
-      for (ChildClassDescriptor child : scannedChildren) {
-        result.add(child);
-      }
+      Set<ChildClassDescriptor> result = new HashSet<>(scannedChildren);
       // recursively add children's children
       Collection<ChildClassDescriptor> allChildren = new ArrayList<>();
       allChildren.addAll(scannedChildren);
@@ -272,7 +269,7 @@ public void scan(final Object cls) {
           List<FieldDescriptor> fieldDescriptors = new ArrayList<>(classFields.size());
           for (FieldInfo field : classFields) {
             String fieldName = field.getName();
-            AnnotationsAttribute fieldAnnotations = ((AnnotationsAttribute)field.getAttribute(AnnotationsAttribute.visibleTag));
+            AnnotationsAttribute fieldAnnotations = ((AnnotationsAttribute) field.getAttribute(AnnotationsAttribute.visibleTag));
             fieldDescriptors.add(new FieldDescriptor(fieldName, field.getDescriptor(), getAnnotationDescriptors(fieldAnnotations)));
           }
           functions.add(new AnnotatedClassDescriptor(classFile.getName(), classAnnotations, fieldDescriptors));
@@ -281,6 +278,9 @@ public void scan(final Object cls) {
     }
 
     private List<AnnotationDescriptor> getAnnotationDescriptors(AnnotationsAttribute annotationsAttr) {
+      if (annotationsAttr == null) {
+        return Collections.emptyList();
+      }
       List<AnnotationDescriptor> annotationDescriptors = new ArrayList<>(annotationsAttr.numAnnotations());
       for (javassist.bytecode.annotation.Annotation annotation : annotationsAttr.getAnnotations()) {
         // Sigh: javassist uses raw collections (is this 2002?)
@@ -319,7 +319,7 @@ public void scan(final Object cls) {
    *           to scan for (relative to specified class loaders' classpath roots)
    * @param  returnRootPathname  whether to collect classpath root portion of
    *           URL for each resource instead of full URL of each resource
-   * @returns  ...; empty set if none
+   * @return  empty set if none
    */
   public static Set<URL> forResource(final String resourcePathname, final boolean returnRootPathname) {
     logger.debug("Scanning classpath for resources with pathname \"{}\".",
@@ -437,11 +437,11 @@ private static void verifyClassUnicity(List<AnnotatedClassDescriptor> annotatedC
 
   static ScanResult emptyResult() {
     return new ScanResult(
-        Collections.<String>emptyList(),
-        Collections.<String>emptyList(),
-        Collections.<String>emptyList(),
-        Collections.<AnnotatedClassDescriptor>emptyList(),
-        Collections.<ParentClassDescriptor>emptyList());
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList());
   }
 
   public static ScanResult fromPrescan(DrillConfig config) {
diff --git a/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java b/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
index 8901d533131..19b2b93606c 100644
--- a/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
+++ b/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -66,4 +67,18 @@ public void addAllFromAnother() {
     final Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
     assertEquals(2, entrySet.size());
   }
+
+  @Test
+  public void checkEquals() {
+    Map<String, String> map1 = CaseInsensitiveMap.newHashMap();
+    map1.put("key_1", "value_1");
+
+    Map<String, String> map2 = CaseInsensitiveMap.newHashMap();
+    map2.put("KEY_1", "value_1");
+    assertEquals(map1, map2);
+
+    map2.put("key_2", "value_2");
+    assertNotEquals(map1, map2);
+  }
+
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index b8e825bfbfd..46e04441f34 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -24,36 +24,34 @@
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Admin;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class HBaseSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
+public class HBaseSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
 
-  final String schemaName;
-  final HBaseStoragePlugin plugin;
+  private final HBaseStoragePlugin plugin;
 
   public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOException {
+    super(name);
     this.plugin = plugin;
-    this.schemaName = name;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    HBaseSchema schema = new HBaseSchema(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    HBaseSchema schema = new HBaseSchema(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
   class HBaseSchema extends AbstractSchema {
 
-    public HBaseSchema(String name) {
-      super(ImmutableList.<String>of(), name);
+    HBaseSchema(String name) {
+      super(Collections.emptyList(), name);
     }
 
     public void setHolder(SchemaPlus plusOfThis) {
@@ -73,13 +71,13 @@ public AbstractSchema getSubSchema(String name) {
     public Table getTable(String name) {
       HBaseScanSpec scanSpec = new HBaseScanSpec(name);
       try {
-        return new DrillHBaseTable(schemaName, plugin, scanSpec);
+        return new DrillHBaseTable(getName(), plugin, scanSpec);
       } catch (Exception e) {
         // Calcite firstly looks for a table in the default schema, if the table was not found,
         // it looks in the root schema.
         // If the table does not exist, a query will fail at validation stage,
         // so the error should not be thrown here.
-        logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
+        logger.warn("Failure while loading table '{}' for database '{}'.", name, getName(), e.getCause());
         return null;
       }
     }
@@ -94,7 +92,7 @@ public Table getTable(String name) {
         }
         return tableNames;
       } catch (Exception e) {
-        logger.warn("Failure while loading table names for database '{}'.", schemaName, e.getCause());
+        logger.warn("Failure while loading table names for database '{}'.", getName(), e.getCause());
         return Collections.emptySet();
       }
     }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
index ec1d0c69456..23f346f6919 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
@@ -38,7 +38,8 @@
 import java.util.List;
 import java.util.Set;
 
-public class HiveDatabaseSchema extends AbstractSchema{
+public class HiveDatabaseSchema extends AbstractSchema {
+
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDatabaseSchema.class);
 
   private final HiveSchema hiveSchema;
@@ -105,10 +106,16 @@ public String getTypeName() {
     return tableNameToTable;
   }
 
+  @Override
+  public boolean areTableNamesCaseSensitive() {
+    return false;
+  }
+
   private static class HiveTableWithoutStatisticAndRowType implements Table {
+
     private final TableType tableType;
 
-    public HiveTableWithoutStatisticAndRowType(final TableType tableType) {
+    HiveTableWithoutStatisticAndRowType(final TableType tableType) {
       this.tableType = tableType;
     }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index e3cb3a2dd73..53f6c6074cc 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -33,8 +33,8 @@
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
@@ -48,8 +48,8 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class HiveSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
+public class HiveSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
 
   // MetaStoreClient created using process user credentials
   private final DrillHiveMetaStoreClient processUserMetastoreClient;
@@ -57,13 +57,12 @@
   private final LoadingCache<String, DrillHiveMetaStoreClient> metaStoreClientLoadingCache;
 
   private final HiveStoragePlugin plugin;
-  private final String schemaName;
   private final HiveConf hiveConf;
   private final boolean isDrillImpersonationEnabled;
   private final boolean isHS2DoAsSet;
 
   public HiveSchemaFactory(final HiveStoragePlugin plugin, final String name, final HiveConf hiveConf) throws ExecutionSetupException {
-    this.schemaName = name;
+    super(name);
     this.plugin = plugin;
 
     this.hiveConf = hiveConf;
@@ -126,8 +125,8 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
         throw new IOException("Failure setting up Hive metastore client.", e);
       }
     }
-    HiveSchema schema = new HiveSchema(schemaConfig, mClientForSchemaTree, schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    HiveSchema schema = new HiveSchema(schemaConfig, mClientForSchemaTree, getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
@@ -137,7 +136,7 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
     private final DrillHiveMetaStoreClient mClient;
     private HiveDatabaseSchema defaultSchema;
 
-    public HiveSchema(final SchemaConfig schemaConfig, final DrillHiveMetaStoreClient mClient, final String name) {
+    HiveSchema(final SchemaConfig schemaConfig, final DrillHiveMetaStoreClient mClient, final String name) {
       super(ImmutableList.<String>of(), name);
       this.schemaConfig = schemaConfig;
       this.mClient = mClient;
@@ -149,7 +148,7 @@ public AbstractSchema getSubSchema(String name) {
       try {
         List<String> dbs = mClient.getDatabases(schemaConfig.getIgnoreAuthErrors());
         if (!dbs.contains(name)) {
-          logger.debug("Database '{}' doesn't exists in Hive storage '{}'", name, schemaName);
+          logger.debug("Database '{}' doesn't exists in Hive storage '{}'", name, getName());
           return null;
         }
         HiveDatabaseSchema schema = getSubSchemaKnownExists(name);
@@ -164,8 +163,7 @@ public AbstractSchema getSubSchema(String name) {
 
     /** Help method to get subschema when we know it exists (already checks the existence) */
     private HiveDatabaseSchema getSubSchemaKnownExists(String name) {
-      HiveDatabaseSchema schema = new HiveDatabaseSchema(this, name, mClient, schemaConfig);
-      return schema;
+      return new HiveDatabaseSchema(this, name, mClient, schemaConfig);
     }
 
     void setHolder(SchemaPlus plusOfThis) {
@@ -206,6 +204,11 @@ public boolean showInInformationSchema() {
       return defaultSchema.getTableNames();
     }
 
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
+
     DrillTable getDrillTable(String dbName, String t) {
       HiveReadEntry entry = getSelectionBaseOnName(dbName, t);
       if (entry == null) {
@@ -216,9 +219,9 @@ DrillTable getDrillTable(String dbName, String t) {
           ImpersonationUtil.getProcessUserName();
 
       if (entry.getJdbcTableType() == TableType.VIEW) {
-        return new DrillHiveViewTable(schemaName, plugin, userToImpersonate, entry);
+        return new DrillHiveViewTable(getName(), plugin, userToImpersonate, entry);
       } else {
-        return new DrillHiveTable(schemaName, plugin, userToImpersonate, entry);
+        return new DrillHiveTable(getName(), plugin, userToImpersonate, entry);
       }
     }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 94f39b806d8..24100106aea 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -435,6 +435,11 @@ public void testHiveConfPropertiesAtSessionLevel() throws Exception {
     test(query);
   }
 
+  @Test
+  public void testSchemaCaseInsensitive() throws Exception {
+    test("select * from Hive.`Default`.Kv");
+  }
+
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
     for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) {
       assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName()));
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index c5c0d488c76..37b8ea0780a 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -90,7 +90,7 @@ public void showDatabases() throws Exception{
         .baselineValues("dfs.tmp")
         .baselineValues("sys")
         .baselineValues("cp.default")
-        .baselineValues("INFORMATION_SCHEMA")
+        .baselineValues("information_schema")
         .go();
   }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
index 3c6e2c29832..e361c66053b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
@@ -19,8 +19,6 @@
 
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.drill.test.TestBuilder;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.impersonation.BaseTestImpersonation;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
 import org.apache.hadoop.fs.FileSystem;
@@ -154,15 +152,6 @@ protected void fromInfoSchemaHelper(final String pluginName, final String db, Li
     testBuilder.go();
   }
 
-  protected static void createView(final String viewOwner, final String viewGroup, final String viewName,
-                                 final String viewDef) throws Exception {
-    updateClient(viewOwner);
-    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, (short) 0750));
-    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
-    final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
-    fs.setOwner(viewFilePath, viewOwner, viewGroup);
-  }
-
   public static void stopHiveMetaStore() throws Exception {
     // Unfortunately Hive metastore doesn't provide an API to shut it down. It will be exited as part of the test JVM
     // exit. As each metastore server instance is using its own resources and not sharing it with other metastore
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
index 30b7430a187..7bc98a39aca 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.Maps;
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.categories.SlowTest;
-import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
@@ -34,6 +33,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -66,10 +66,10 @@
   private static final String v_student_u1g1_750 = "v_student_u1g1_750";
 
   private static final String query_v_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
 
   private static final String query_v_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
 
   // Role for testing purpose
   private static final String test_role0 = "role0";
@@ -82,7 +82,7 @@ public static void setup() throws Exception {
     startHiveMetaStore();
     startDrillCluster(true);
     addHiveStoragePlugin(getHivePluginConfig());
-    addMiniDfsBasedStorage(Maps.<String, WorkspaceConfig>newHashMap());
+    addMiniDfsBasedStorage(new HashMap<>());
     generateTestData();
   }
 
@@ -134,8 +134,7 @@ private static void generateTestData() throws Exception {
             hivePluginName, db_general, g_student_user0));
 
     createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
   }
 
   private static void createTbl(final Driver driver, final String db, final String tbl, final String tblDef,
@@ -277,15 +276,15 @@ public void selectUser1_v_student_u0g0_750() throws Exception {
   @Test
   public void selectUser2_v_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_student_u0g0_750,
-        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u0g0_750, String.format(
+        "Not authorized to read view [v_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_student_u1g1_750,
-        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u1g1_750, String.format(
+        "Not authorized to read view [v_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
index 5a3e3732090..d54c4e041a6 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
@@ -92,10 +92,10 @@
   private static final String v_student_u1g1_750 = "v_student_u1g1_750";
 
   private static final String query_v_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
 
   private static final String query_v_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
 
   // Create a view on "partitioned_student_u0_700". View is owned by user0:group0 and has permissions 750
   private static final String v_partitioned_student_u0g0_750 = "v_partitioned_student_u0g0_750";
@@ -104,11 +104,11 @@
   private static final String v_partitioned_student_u1g1_750 = "v_partitioned_student_u1g1_750";
 
   private static final String query_v_partitioned_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp",
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp",
       v_partitioned_student_u0g0_750);
 
   private static final String query_v_partitioned_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp",
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp",
       v_partitioned_student_u1g1_750);
 
   @BeforeClass
@@ -199,16 +199,14 @@ private static void generateTestData() throws Exception {
             hivePluginName, db_general, g_student_u0_700));
 
     createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
 
     createView(org1Users[0], org1Groups[0], v_partitioned_student_u0g0_750,
         String.format("SELECT rownum, name, age, studentnum FROM %s.%s.%s",
             hivePluginName, db_general, g_partitioned_student_u0_700));
 
     createView(org1Users[1], org1Groups[1], v_partitioned_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_partitioned_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_partitioned_student_u0g0_750));
   }
 
   private static void createPartitionedTable(final Driver hiveDriver, final String db, final String tbl,
@@ -521,15 +519,15 @@ public void selectUser1_v_student_u0g0_750() throws Exception {
   @Test
   public void selectUser2_v_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_student_u0g0_750,
-        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u0g0_750, String.format(
+        "Not authorized to read view [v_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_student_u1g1_750,
-        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u1g1_750, String.format(
+        "Not authorized to read view [v_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
@@ -555,15 +553,15 @@ public void selectUser1_v_partitioned_student_u0g0_750() throws Exception {
   @Test
   public void selectUser2_v_partitioned_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_partitioned_student_u0g0_750,
-        "Not authorized to read view [v_partitioned_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_partitioned_student_u0g0_750, String.format(
+        "Not authorized to read view [v_partitioned_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_partitioned_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_partitioned_student_u1g1_750,
-        "Not authorized to read view [v_partitioned_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_partitioned_student_u1g1_750, String.format(
+        "Not authorized to read view [v_partitioned_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
index 8f44a936ccb..86ef09590bb 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -20,25 +20,23 @@
 import java.io.IOException;
 
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 
-public class KafkaSchemaFactory implements SchemaFactory {
+public class KafkaSchemaFactory extends AbstractSchemaFactory {
 
-  private final String schemaName;
   private final KafkaStoragePlugin plugin;
 
   public KafkaSchemaFactory(KafkaStoragePlugin plugin, String schemaName) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
-      throws IOException {
-    KafkaMessageSchema schema = new KafkaMessageSchema(this.plugin, this.schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    KafkaMessageSchema schema = new KafkaMessageSchema(plugin, getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
index fe2def33cba..c1eb1e591e8 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -29,37 +29,35 @@
 import org.apache.drill.exec.physical.base.Writer;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.ListTablesResponse;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class KuduSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
+public class KuduSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
 
-  final String schemaName;
-  final KuduStoragePlugin plugin;
+  private final KuduStoragePlugin plugin;
 
   public KuduSchemaFactory(KuduStoragePlugin plugin, String name) throws IOException {
+    super(name);
     this.plugin = plugin;
-    this.schemaName = name;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    KuduTables schema = new KuduTables(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    KuduTables schema = new KuduTables(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
   class KuduTables extends AbstractSchema {
 
-    public KuduTables(String name) {
-      super(ImmutableList.<String>of(), name);
+    KuduTables(String name) {
+      super(Collections.emptyList(), name);
     }
 
     public void setHolder(SchemaPlus plusOfThis) {
@@ -81,7 +79,7 @@ public Table getTable(String name) {
       try {
         KuduTable table = plugin.getClient().openTable(name);
         Schema schema = table.getSchema();
-        return new DrillKuduTable(schemaName, plugin, schema, scanSpec);
+        return new DrillKuduTable(getName(), plugin, schema, scanSpec);
       } catch (Exception e) {
         logger.warn("Failure while retrieving kudu table {}", name, e);
         return null;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 366f129bc99..34374203cd8 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -32,8 +32,8 @@
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
 import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
@@ -49,21 +49,19 @@
 import com.mongodb.MongoException;
 import com.mongodb.client.MongoDatabase;
 
-public class MongoSchemaFactory implements SchemaFactory {
+public class MongoSchemaFactory extends AbstractSchemaFactory {
 
-  static final Logger logger = LoggerFactory
-      .getLogger(MongoSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(MongoSchemaFactory.class);
 
   private static final String DATABASES = "databases";
 
   private LoadingCache<String, List<String>> databases;
   private LoadingCache<String, List<String>> tableNameLoader;
-  private final String schemaName;
   private final MongoStoragePlugin plugin;
 
   public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
 
     databases = CacheBuilder //
         .newBuilder() //
@@ -119,8 +117,8 @@ public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws E
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    MongoSchema schema = new MongoSchema(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    MongoSchema schema = new MongoSchema(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
@@ -186,7 +184,7 @@ public boolean showInInformationSchema() {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(dbName, collectionName);
-      return new DynamicDrillTable(plugin, schemaName, null, mongoScanSpec);
+      return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
     }
 
     @Override
diff --git a/contrib/storage-opentsdb/README.md b/contrib/storage-opentsdb/README.md
index 0c616b505a6..04ac1a6fad8 100644
--- a/contrib/storage-opentsdb/README.md
+++ b/contrib/storage-opentsdb/README.md
@@ -1,4 +1,4 @@
-# drill-storage-openTSDB
+# drill--opentsdb-storage
 
 Implementation of TSDB storage plugin. Plugin uses REST API to work with TSDB. 
 
@@ -30,12 +30,12 @@ List of supported time
 
 Params must be specified in FROM clause of the query separated by commas. For example
 
-`openTSDB.(metric=metric_name, start=4d-ago, aggregator=sum)`
+`opentsdb.(metric=metric_name, start=4d-ago, aggregator=sum)`
 
 Supported queries for now are listed below:
 
 ```
-USE openTSDB
+USE opentsdb
 ```
 
 ```
@@ -44,26 +44,26 @@ SHOW tables
 Will print available metrics. Max number of the printed results is a Integer.MAX value
 
 ```
-SELECT * FROM openTSDB. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` 
+SELECT * FROM opentsdb. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` 
 ```
 Return aggregated elements from `warp.speed.test` table since 47y-ago 
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
 ```
 Return aggregated elements from `warp.speed.test` table
 
 ```
-SELECT `timestamp`, sum(`aggregated value`) FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
+SELECT `timestamp`, sum(`aggregated value`) FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
 ```
 Return aggregated and grouped value by standard drill functions from `warp.speed.test table`, but with the custom aggregator
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
 ```
 Return aggregated data limited by downsample
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
 ```
 Return aggregated data limited by end time
\ No newline at end of file
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
index 2de763bdc54..539442affbf 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
@@ -36,10 +36,9 @@
 
   private final ServiceImpl db;
 
-  public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name)
-      throws IOException {
+  public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name) throws IOException {
     super(context, name);
-    this.schemaFactory = new OpenTSDBSchemaFactory(this, name);
+    this.schemaFactory = new OpenTSDBSchemaFactory(this, getName());
     this.engineConfig = configuration;
     this.db = new ServiceImpl(configuration.getConnection());
   }
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
index 3b86a950536..f7ae4f3e777 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
@@ -20,8 +20,8 @@
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.openTSDB.DrillOpenTSDBTable;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBScanSpec;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePlugin;
@@ -34,41 +34,40 @@
 import java.util.Collections;
 import java.util.Set;
 
-public class OpenTSDBSchemaFactory implements SchemaFactory {
+public class OpenTSDBSchemaFactory extends AbstractSchemaFactory {
 
-  private static final Logger log = LoggerFactory.getLogger(OpenTSDBSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(OpenTSDBSchemaFactory.class);
 
-  private final String schemaName;
   private final OpenTSDBStoragePlugin plugin;
 
   public OpenTSDBSchemaFactory(OpenTSDBStoragePlugin plugin, String schemaName) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    OpenTSDBSchema schema = new OpenTSDBSchema(schemaName);
-    parent.add(schemaName, schema);
+    OpenTSDBSchema schema = new OpenTSDBSchema(getName());
+    parent.add(getName(), schema);
   }
 
   class OpenTSDBSchema extends AbstractSchema {
 
     OpenTSDBSchema(String name) {
-      super(Collections.<String>emptyList(), name);
+      super(Collections.emptyList(), name);
     }
 
     @Override
     public Table getTable(String name) {
       OpenTSDBScanSpec scanSpec = new OpenTSDBScanSpec(name);
       try {
-        return new DrillOpenTSDBTable(schemaName, plugin, new Schema(plugin.getClient(), name), scanSpec);
+        return new DrillOpenTSDBTable(getName(), plugin, new Schema(plugin.getClient(), name), scanSpec);
       } catch (Exception e) {
         // Calcite firstly looks for a table in the default schema, if the table was not found,
         // it looks in the root schema.
         // If the table does not exist, a query will fail at validation stage,
         // so the error should not be thrown here.
-        logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
+        logger.warn("Failure while loading table '{}' for database '{}'.", name, getName(), e.getCause());
         return null;
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 08398986939..97dd2d2171f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -68,7 +68,7 @@ public ZookeeperClient(final CuratorFramework curator, final String root, final
    */
   public void start() throws Exception {
     curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
-    getCache().start();
+    getCache().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); //build cache at start up, to ensure we get correct results right away
   }
 
   public PathChildrenCache getCache() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 1318f72dcfb..9b1a34a3c82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.expr.fn.registry;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -35,6 +37,7 @@
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.exception.FunctionValidationException;
 import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.apache.drill.exec.expr.fn.FunctionConverter;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
@@ -112,7 +115,7 @@ public long getVersion() {
   public List<String> validate(String jarName, ScanResult scanResult) {
     List<String> functions = Lists.newArrayList();
     FunctionConverter converter = new FunctionConverter();
-    List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses();
+    List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses(FunctionTemplate.class.getName());
 
     if (registryHolder.containsJar(jarName)) {
       throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
@@ -158,11 +161,11 @@ public long getVersion() {
    * @param version remote function registry version number with which local function registry is synced
    */
   public void register(List<JarScan> jars, long version) {
-    Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
+    Map<String, List<FunctionHolder>> newJars = new HashMap<>();
     for (JarScan jarScan : jars) {
       FunctionConverter converter = new FunctionConverter();
-      List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses();
-      List<FunctionHolder> functions = Lists.newArrayList();
+      List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses(FunctionTemplate.class.getName());
+      List<FunctionHolder> functions = new ArrayList<>();
       newJars.put(jarScan.getJarName(), functions);
       for (AnnotatedClassDescriptor func : providerClasses) {
         DrillFuncHolder holder = converter.getHolder(func, jarScan.getClassLoader());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
index 1493a923c5a..ad0c2cc8f75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -28,6 +28,7 @@
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 
@@ -38,11 +39,13 @@
 
 public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>> {
 
-  private Map<String, StoragePluginConfig> storage;
+  private final Map<String, StoragePluginConfig> storage;
 
   @JsonCreator
   public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage) {
-    this.storage = storage;
+    Map<String, StoragePluginConfig> caseInsensitiveStorage = CaseInsensitiveMap.newHashMap();
+    Optional.ofNullable(storage).ifPresent(caseInsensitiveStorage::putAll);
+    this.storage = caseInsensitiveStorage;
   }
 
   public static void main(String[] args) throws Exception{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index a2623634916..b47ab32ebe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -104,7 +104,8 @@ public static String getPrefixSchemaPath(final String defaultSchema,
   /** Utility method to search for schema path starting from the given <i>schema</i> reference */
   private static SchemaPlus searchSchemaTree(SchemaPlus schema, final List<String> schemaPath) {
     for (String schemaName : schemaPath) {
-      schema = schema.getSubSchema(schemaName);
+      // schemas in Drill are case insensitive and stored in lower case
+      schema = schema.getSubSchema(schemaName.toLowerCase());
       if (schema == null) {
         return null;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
index bb51ef0b623..3f11fd16ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
@@ -80,9 +80,9 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
         .build(logger);
     }
 
+    AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
     StoragePlugin storagePlugin;
     try {
-      AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
       storagePlugin = context.getStorage().getPlugin(drillSchema.getSchemaPath().get(0));
       if (storagePlugin == null) {
         throw new DrillRuntimeException(String.format("Unable to find storage plugin with the following name [%s].",
@@ -95,10 +95,10 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
     try {
       Map configMap = mapper.convertValue(storagePlugin.getConfig(), Map.class);
       if (storagePlugin instanceof FileSystemPlugin) {
-        transformWorkspaces(schema.names, configMap);
+        transformWorkspaces(drillSchema.getSchemaPath(), configMap);
       }
       String properties = mapper.writeValueAsString(configMap);
-      return DirectPlan.createDirectPlan(context, new DescribeSchemaResult(Joiner.on(".").join(schema.names), properties));
+      return DirectPlan.createDirectPlan(context, new DescribeSchemaResult(drillSchema.getFullSchemaName(), properties));
     } catch (JsonProcessingException e) {
       throw new DrillRuntimeException("Error while trying to convert storage config to json string", e);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index 32768f8222e..70dd5a845c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -24,10 +24,12 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -37,6 +39,7 @@
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.exceptions.UserException;
@@ -44,11 +47,10 @@
 import org.apache.drill.exec.planner.sql.SqlConverter;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
+import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.google.common.collect.ImmutableList;
-
 public class DescribeTableHandler extends DefaultSqlHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DescribeTableHandler.class);
 
@@ -60,23 +62,19 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     DrillSqlDescribeTable node = unwrap(sqlNode, DrillSqlDescribeTable.class);
 
     try {
-      List<SqlNode> selectList =
-          ImmutableList.of(new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                           new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
-                           new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
+      List<SqlNode> selectList = Arrays.asList(
+          new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+          new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
+          new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
 
-      SqlNode fromClause = new SqlIdentifier(
-          ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), null, SqlParserPos.ZERO, null);
+      SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), SqlParserPos.ZERO);
 
-      final SqlIdentifier table = node.getTable();
-      final SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
-      final List<String> schemaPathGivenInCmd = Util.skipLast(table.names);
-      final SchemaPlus schema = SchemaUtilites.findSchema(defaultSchema, schemaPathGivenInCmd);
-      final String charset = Util.getDefaultCharset().name();
+      SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
+      List<String> schemaPathGivenInCmd = Util.skipLast(node.getTable().names);
+      SchemaPlus schema = SchemaUtilites.findSchema(defaultSchema, schemaPathGivenInCmd);
 
       if (schema == null) {
-        SchemaUtilites.throwSchemaNotFoundException(defaultSchema,
-            SchemaUtilites.SCHEMA_PATH_JOINER.join(schemaPathGivenInCmd));
+        SchemaUtilites.throwSchemaNotFoundException(defaultSchema, SchemaUtilites.getSchemaPath(schemaPathGivenInCmd));
       }
 
       if (SchemaUtilites.isRootSchema(schema)) {
@@ -85,10 +83,11 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
             .build(logger);
       }
 
-      final String tableName = Util.last(table.names);
-
       // find resolved schema path
-      final String schemaPath = SchemaUtilites.unwrapAsDrillSchemaInstance(schema).getFullSchemaName();
+      AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schema);
+      String schemaPath = drillSchema.getFullSchemaName();
+
+      String tableName = Util.last(node.getTable().names);
 
       if (schema.getTable(tableName) == null) {
         throw UserException.validationError()
@@ -101,14 +100,21 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
         schemaCondition = DrillParserUtil.createCondition(
             new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
             SqlStdOperatorTable.EQUALS,
-            SqlLiteral.createCharString(schemaPath, charset, SqlParserPos.ZERO)
+            SqlLiteral.createCharString(schemaPath, Util.getDefaultCharset().name(), SqlParserPos.ZERO)
         );
       }
 
-      SqlNode where = DrillParserUtil.createCondition(
-          new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO),
+      SqlNode tableNameColumn = new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO);
+
+      // if table names are case insensitive, wrap column values and condition in lower function
+      if (!drillSchema.areTableNamesCaseSensitive()) {
+        tableNameColumn = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, tableNameColumn);
+        tableName = tableName.toLowerCase();
+      }
+
+      SqlNode where = DrillParserUtil.createCondition(tableNameColumn,
           SqlStdOperatorTable.EQUALS,
-          SqlLiteral.createCharString(tableName, charset, SqlParserPos.ZERO));
+          SqlLiteral.createCharString(tableName, Util.getDefaultCharset().name(), SqlParserPos.ZERO));
 
       where = DrillParserUtil.createCondition(schemaCondition, SqlStdOperatorTable.AND, where);
 
@@ -116,14 +122,21 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
       if (node.getColumn() != null) {
         columnFilter =
             DrillParserUtil.createCondition(
-                new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+                SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO)),
                 SqlStdOperatorTable.EQUALS,
-                SqlLiteral.createCharString(node.getColumn().toString(), charset, SqlParserPos.ZERO));
+                SqlLiteral.createCharString(node.getColumn().toString().toLowerCase(), Util.getDefaultCharset().name(), SqlParserPos.ZERO));
       } else if (node.getColumnQualifier() != null) {
-        columnFilter =
-            DrillParserUtil.createCondition(
-                new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                SqlStdOperatorTable.LIKE, node.getColumnQualifier());
+        SqlNode columnQualifier = node.getColumnQualifier();
+        SqlNode column = new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO);
+        if (columnQualifier instanceof SqlCharStringLiteral) {
+          NlsString conditionString = ((SqlCharStringLiteral) columnQualifier).getNlsString();
+          columnQualifier = SqlCharStringLiteral.createCharString(
+              conditionString.getValue().toLowerCase(),
+              conditionString.getCharsetName(),
+              columnQualifier.getParserPosition());
+          column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+        }
+        columnFilter = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, columnQualifier);
       }
 
       where = DrillParserUtil.createCondition(where, SqlStdOperatorTable.AND, columnFilter);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
index ab460ad92f4..2c07d2c5d51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
@@ -17,8 +17,12 @@
  */
 package org.apache.drill.exec.planner.sql.handlers;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.util.NlsString;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
@@ -33,8 +37,6 @@
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
-import com.google.common.collect.ImmutableList;
-
 public class ShowSchemasHandler extends DefaultSqlHandler {
 
   public ShowSchemasHandler(SqlHandlerConfig config) { super(config); }
@@ -43,17 +45,24 @@
   @Override
   public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowSchemas node = unwrap(sqlNode, SqlShowSchemas.class);
-    List<SqlNode> selectList =
-        ImmutableList.of(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
+    List<SqlNode> selectList = Collections.singletonList(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
 
-    SqlNode fromClause = new SqlIdentifier(
-        ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), null, SqlParserPos.ZERO, null);
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), SqlParserPos.ZERO);
 
     SqlNode where = null;
-    final SqlNode likePattern = node.getLikePattern();
+    SqlNode likePattern = node.getLikePattern();
     if (likePattern != null) {
-      where = DrillParserUtil.createCondition(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO),
-                                              SqlStdOperatorTable.LIKE, likePattern);
+      SqlNode column = new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO);
+      // schema names are case insensitive, wrap column in lower function, pattern to lower case
+      if (likePattern instanceof SqlCharStringLiteral) {
+        NlsString conditionString = ((SqlCharStringLiteral) likePattern).getNlsString();
+        likePattern = SqlCharStringLiteral.createCharString(
+            conditionString.getValue().toLowerCase(),
+            conditionString.getCharsetName(),
+            likePattern.getParserPosition());
+        column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+      }
+      where = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, likePattern);
     } else if (node.getWhereClause() != null) {
       where = node.getWhereClause();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index e73e829a96d..a910f9a9375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -21,10 +21,12 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -34,6 +36,7 @@
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.exceptions.UserException;
@@ -45,9 +48,6 @@
 import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 public class ShowTablesHandler extends DefaultSqlHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ShowTablesHandler.class);
 
@@ -57,48 +57,56 @@
   @Override
   public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowTables node = unwrap(sqlNode, SqlShowTables.class);
-    List<SqlNode> selectList = Lists.newArrayList();
-    SqlNode fromClause;
-    SqlNode where;
-
-    // create select columns
-    selectList.add(new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO));
-    selectList.add(new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
+    List<SqlNode> selectList = Arrays.asList(
+        new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
+        new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
 
-    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
 
-    final SqlIdentifier db = node.getDb();
-    String tableSchema;
-    if (db != null) {
-      tableSchema = db.toString();
-    } else {
-      // If no schema is given in SHOW TABLES command, list tables from current schema
-      SchemaPlus schema = config.getConverter().getDefaultSchema();
+    SchemaPlus schemaPlus;
+    if (node.getDb() != null) {
+      List<String> schemaNames = node.getDb().names;
+      schemaPlus = SchemaUtilites.findSchema(config.getConverter().getDefaultSchema(), schemaNames);
 
-      if (SchemaUtilites.isRootSchema(schema)) {
-        // If the default schema is a root schema, throw an error to select a default schema
+      if (schemaPlus == null) {
         throw UserException.validationError()
-            .message("No default schema selected. Select a schema using 'USE schema' command")
+            .message(String.format("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames)))
             .build(logger);
       }
 
-      final AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schema);
-      tableSchema = drillSchema.getFullSchemaName();
+    } else {
+      // If no schema is given in SHOW TABLES command, list tables from current schema
+      schemaPlus = config.getConverter().getDefaultSchema();
+    }
+
+    if (SchemaUtilites.isRootSchema(schemaPlus)) {
+      // If the default schema is a root schema, throw an error to select a default schema
+      throw UserException.validationError()
+          .message("No default schema selected. Select a schema using 'USE schema' command")
+          .build(logger);
     }
 
-    final String charset = Util.getDefaultCharset().name();
-    where = DrillParserUtil.createCondition(
+    AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
+
+    SqlNode where = DrillParserUtil.createCondition(
         new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
         SqlStdOperatorTable.EQUALS,
-        SqlLiteral.createCharString(tableSchema, charset, SqlParserPos.ZERO));
+        SqlLiteral.createCharString(drillSchema.getFullSchemaName(), Util.getDefaultCharset().name(), SqlParserPos.ZERO));
 
     SqlNode filter = null;
-    final SqlNode likePattern = node.getLikePattern();
-    if (likePattern != null) {
-      filter = DrillParserUtil.createCondition(
-          new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO),
-          SqlStdOperatorTable.LIKE,
-          likePattern);
+    if (node.getLikePattern() != null) {
+      SqlNode likePattern = node.getLikePattern();
+      SqlNode column = new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO);
+      // wrap columns name values and condition in lower function if case insensitive
+      if (!drillSchema.areTableNamesCaseSensitive() && likePattern instanceof SqlCharStringLiteral) {
+        NlsString conditionString = ((SqlCharStringLiteral) likePattern).getNlsString();
+        likePattern = SqlCharStringLiteral.createCharString(
+            conditionString.getValue().toLowerCase(),
+            conditionString.getCharsetName(),
+            likePattern.getParserPosition());
+        column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+      }
+      filter = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, likePattern);
     } else if (node.getWhereClause() != null) {
       filter = node.getWhereClause();
     }
@@ -119,4 +127,4 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     converter.useRootSchemaAsDefault(false);
     return sqlNodeRelDataTypePair;
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index fb9bc6b687b..2b70c3ba15e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,7 +47,8 @@
   private static final Expression EXPRESSION = new DefaultExpression(Object.class);
 
   public AbstractSchema(List<String> parentSchemaPath, String name) {
-    schemaPath = Lists.newArrayList();
+    name = name == null ? null : name.toLowerCase();
+    schemaPath = new ArrayList<>();
     schemaPath.addAll(parentSchemaPath);
     schemaPath.add(name);
     this.name = name;
@@ -96,7 +98,7 @@ public Schema getDefaultSchema() {
    * Create a new view given definition.
    * @param view View info including name, definition etc.
    * @return Returns true if an existing view is replaced with the given view. False otherwise.
-   * @throws IOException
+   * @throws IOException in case of error creating a view
    */
   public boolean createView(View view) throws IOException {
     throw UserException.unsupportedError()
@@ -107,8 +109,8 @@ public boolean createView(View view) throws IOException {
   /**
    * Drop the view with given name.
    *
-   * @param viewName
-   * @throws IOException
+   * @param viewName view name
+   * @throws IOException in case of error dropping the view
    */
   public void dropView(String viewName) throws IOException {
     throw UserException.unsupportedError()
@@ -217,7 +219,7 @@ public void dropTable(String tableName) {
    * plugin supports).
    * It is not guaranteed that the retrieved tables would have RowType and Statistic being fully populated.
    *
-   * Specifically, calling {@link Table#getRowType(RelDataTypeFactory)} or {@link Table#getStatistic()} might incur
+   * Specifically, calling {@link Table#getRowType(org.apache.calcite.rel.type.RelDataTypeFactory)} or {@link Table#getStatistic()} might incur
    * {@link UnsupportedOperationException} being thrown.
    *
    * @param  tableNames the requested tables, specified by the table names
@@ -263,4 +265,15 @@ public void dropTable(String tableName) {
     return tableNamesAndTypes;
   }
 
+  /**
+   * Indicates if table names in schema are case sensitive. By default they are.
+   * If schema implementation claims its table names are case insensitive,
+   * it is responsible for making case insensitive look up by table name.
+   *
+   * @return true if table names are case sensitive
+   */
+  public boolean areTableNamesCaseSensitive() {
+    return true;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
new file mode 100644
index 00000000000..da5af1dee79
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+/**
+ * Abstract implementation of {@link SchemaFactory}, ensures that given schema name is always converted is lower case.
+ */
+public abstract class AbstractSchemaFactory implements SchemaFactory {
+
+  private final String name;
+
+  protected AbstractSchemaFactory(String name) {
+    this.name = name == null ? null : name.toLowerCase();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 9818ff36882..b90fd63d81e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -34,7 +34,8 @@
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 
-/** Abstract class for StorePlugin implementations.
+/**
+ * Abstract class for StorePlugin implementations.
  * See StoragePlugin for description of the interface intent and its methods.
  */
 public abstract class AbstractStoragePlugin implements StoragePlugin {
@@ -44,7 +45,7 @@
 
   protected AbstractStoragePlugin(DrillbitContext inContext, String inName) {
     this.context = inContext;
-    this.name = inName;
+    this.name = inName == null ? null : inName.toLowerCase();
   }
 
   @Override
@@ -137,12 +138,13 @@ public FormatPlugin getFormatPlugin(FormatPluginConfig config) {
     throw new UnsupportedOperationException(String.format("%s doesn't support format plugins", getClass().getName()));
   }
 
-  public DrillbitContext getContext() {
-    return context;
-  }
-
+  @Override
   public String getName() {
     return name;
   }
 
+  public DrillbitContext getContext() {
+    return context;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
index c87d8f6f5ea..a63d015e1b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
@@ -36,7 +36,7 @@ public PartitionExplorerImpl(SchemaPlus rootSchema) {
                                            List<String> partitionValues
                                            ) throws PartitionNotFoundException {
 
-    AbstractSchema subSchema = rootSchema.getSubSchema(schema).unwrap(AbstractSchema.class);
+    AbstractSchema subSchema = rootSchema.getSubSchema(schema.toLowerCase()).unwrap(AbstractSchema.class);
     return subSchema.getSubPartitions(table, partitionColumns, partitionValues);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index 20275272a5b..4545169e7c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -25,14 +25,13 @@
  * StoragePlugins implements this interface to register the schemas they provide.
  */
 public interface SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
   /**
    * Register the schemas provided by this SchemaFactory implementation under the given parent schema.
    *
    * @param schemaConfig Configuration for schema objects.
    * @param parent Reference to parent schema.
-   * @throws IOException
+   * @throws IOException in case of error during schema registration
    */
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
+  void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 7bd7eaf7dd3..4e6a7c2cb3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -112,4 +112,6 @@
    * @throws UnsupportedOperationException, if storage plugin doesn't support format plugins.
    */
   FormatPlugin getFormatPlugin(FormatPluginConfig config);
+
+  String getName();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index 582791e739d..62fd0316297 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -22,8 +22,6 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -31,17 +29,19 @@
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 
 /**
  * Holds maps to storage plugins. Supports name => plugin and config => plugin mappings.
  *
  * This is inspired by ConcurrentMap but provides a secondary key mapping that allows an alternative lookup mechanism.
  * The class is responsible for internally managing consistency between the two maps. This class is threadsafe.
+ * Name map is case insensitive.
  */
 class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class);
 
-  private final ConcurrentMap<String, StoragePlugin> nameMap = new ConcurrentHashMap<>();
+  private final Map<String, StoragePlugin> nameMap = CaseInsensitiveMap.newConcurrentMap();
 
   @SuppressWarnings("unchecked")
   private final Multimap<StoragePluginConfig, StoragePlugin> configMap =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 313d3b985e0..036187ab9ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -27,8 +27,7 @@
 import org.apache.drill.exec.store.sys.PersistentStore;
 
 public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
-  String SYS_PLUGIN = "sys";
-  String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
+
   String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
   String ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE = "drill.exec.storage.action_on_plugins_override_file";
   String PSTORE_NAME = "sys.storage_plugins";
@@ -92,8 +91,7 @@
    * @return A FormatPlugin instance
    * @throws ExecutionSetupException if plugin cannot be obtained
    */
-  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
-      throws ExecutionSetupException;
+  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException;
 
   /**
    * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 03ce5a9b68e..bd5a93d490a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -43,19 +43,18 @@
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
-import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.CaseInsensitivePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
-import org.apache.drill.exec.store.sys.SystemTablePlugin;
-import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
@@ -67,35 +66,28 @@
 import com.google.common.io.Resources;
 
 public class StoragePluginRegistryImpl implements StoragePluginRegistry {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
 
-  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
-  private final StoragePluginMap enabledPlugins = new StoragePluginMap();
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
 
-  private DrillbitContext context;
-  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
-  private final PersistentStore<StoragePluginConfig> pluginSystemTable;
+  private final StoragePluginMap enabledPlugins;
+  private final DrillSchemaFactory schemaFactory;
+  private final DrillbitContext context;
   private final LogicalPlanPersistence lpPersistence;
   private final ScanResult classpathScan;
+  private final PersistentStore<StoragePluginConfig> pluginSystemTable;
   private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
 
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
+  private Map<String, StoragePlugin> systemPlugins = Collections.emptyMap();
+
   public StoragePluginRegistryImpl(DrillbitContext context) {
+    this.enabledPlugins = new StoragePluginMap();
+    this.schemaFactory = new DrillSchemaFactory(null);
     this.context = checkNotNull(context);
     this.lpPersistence = checkNotNull(context.getLpPersistence());
     this.classpathScan = checkNotNull(context.getClasspathScan());
-    try {
-      this.pluginSystemTable = context
-          .getStoreProvider()
-          .getOrCreateStore(PersistentStoreConfig
-              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
-              .name(PSTORE_NAME)
-              .build());
-    } catch (StoreException | RuntimeException e) {
-      logger.error("Failure while loading storage plugin registry.", e);
-      throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
-    }
-
-    ephemeralPlugins = CacheBuilder.newBuilder()
+    this.pluginSystemTable = initPluginsSystemTable(context, lpPersistence);
+    this.ephemeralPlugins = CacheBuilder.newBuilder()
         .expireAfterAccess(24, TimeUnit.HOURS)
         .maximumSize(250)
         .removalListener(
@@ -108,16 +100,12 @@ public StoragePlugin load(StoragePluginConfig config) throws Exception {
         });
   }
 
-  @Override
-  public PersistentStore<StoragePluginConfig> getStore() {
-    return pluginSystemTable;
-  }
-
   @Override
   public void init() {
     availablePlugins = findAvailablePlugins(classpathScan);
+    systemPlugins = initSystemPlugins(classpathScan, context);
     try {
-      StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins();
+      StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins(lpPersistence);
 
       StoragePluginsHandler storagePluginsHandler = new StoragePluginsHandlerService(context);
       storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins);
@@ -129,115 +117,18 @@ public void init() {
     }
   }
 
-  /**
-   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
-   * instantiating of Drill
-   *
-   * @return bootstrap storage plugins
-   * @throws IOException if a read error occurs
-   */
-  private StoragePlugins loadBootstrapPlugins() throws IOException {
-    // bootstrap load the config since no plugins are stored.
-    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
-    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
-    if (urls != null && !urls.isEmpty()) {
-      logger.info("Loading the storage plugin configs from URLs {}.", urls);
-      StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
-      Map<String, URL> pluginURLMap = new HashMap<>();
-      for (URL url : urls) {
-        String pluginsData = Resources.toString(url, Charsets.UTF_8);
-        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
-        for (Entry<String, StoragePluginConfig> plugin : plugins) {
-          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
-          if (oldPluginConfig != null) {
-            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
-                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
-          } else {
-            pluginURLMap.put(plugin.getKey(), url);
-          }
-        }
-      }
-      return bootstrapPlugins;
-    } else {
-      throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
-    }
-  }
-
-  /**
-   * It initializes {@link #enabledPlugins} with currently enabled plugins
-   */
-  private void defineEnabledPlugins() {
-    Map<String, StoragePlugin> activePlugins = new HashMap<>();
-    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
-    while (allPlugins.hasNext()) {
-      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
-      String name = plugin.getKey();
-      StoragePluginConfig config = plugin.getValue();
-      if (config.isEnabled()) {
-        try {
-          StoragePlugin storagePlugin = create(name, config);
-          activePlugins.put(name, storagePlugin);
-        } catch (ExecutionSetupException e) {
-          logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
-          config.setEnabled(false);
-          pluginSystemTable.put(name, config);
-        }
-      }
-    }
-
-    activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
-        INFORMATION_SCHEMA_PLUGIN));
-    activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
-
-    enabledPlugins.putAll(activePlugins);
-  }
-
-  /**
-   * Add a plugin and configuration. Assumes neither exists. Primarily
-   * for testing.
-   *
-   * @param name plugin name
-   * @param config plugin config
-   * @param plugin plugin implementation
-   */
-  @VisibleForTesting
-  public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
-    addEnabledPlugin(name, plugin);
-    pluginSystemTable.putIfAbsent(name, config);
-  }
-
-  @Override
-  public void addEnabledPlugin(String name, StoragePlugin plugin) {
-    enabledPlugins.put(name, plugin);
-  }
-
   @Override
   public void deletePlugin(String name) {
-    @SuppressWarnings("resource")
     StoragePlugin plugin = enabledPlugins.remove(name);
     closePlugin(plugin);
     pluginSystemTable.delete(name);
   }
 
-  private void closePlugin(StoragePlugin plugin) {
-    if (plugin == null) {
-      return;
-    }
-
-    try {
-      plugin.close();
-    } catch (Exception e) {
-      logger.warn("Exception while shutting down storage plugin.");
-    }
-  }
-
-  @SuppressWarnings("resource")
   @Override
-  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
-      throws ExecutionSetupException {
+  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException {
     for (;;) {
-      final StoragePlugin oldPlugin = enabledPlugins.get(name);
-      final StoragePlugin newPlugin = create(name, config);
+      StoragePlugin oldPlugin = enabledPlugins.get(name);
+      StoragePlugin newPlugin = create(name, config);
       boolean done = false;
       try {
         if (oldPlugin != null) {
@@ -273,12 +164,12 @@ public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boo
   @Override
   public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
     StoragePlugin plugin = enabledPlugins.get(name);
-    if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
+    if (systemPlugins.get(name) != null) {
       return plugin;
     }
 
     // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
-    StoragePluginConfig config = this.pluginSystemTable.get(name);
+    StoragePluginConfig config = pluginSystemTable.get(name);
     if (config == null) {
       if (plugin != null) {
         enabledPlugins.remove(name);
@@ -294,7 +185,6 @@ public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
     }
   }
 
-
   @Override
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if (config instanceof NamedStoragePluginConfig) {
@@ -322,14 +212,246 @@ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetup
     }
   }
 
-  @SuppressWarnings("resource")
   @Override
-  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
-      throws ExecutionSetupException {
+  public void addEnabledPlugin(String name, StoragePlugin plugin) {
+    enabledPlugins.put(name, plugin);
+  }
+
+  @Override
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
     StoragePlugin storagePlugin = getPlugin(storageConfig);
     return storagePlugin.getFormatPlugin(formatConfig);
   }
 
+  @Override
+  public PersistentStore<StoragePluginConfig> getStore() {
+    return pluginSystemTable;
+  }
+
+  @Override
+  public SchemaFactory getSchemaFactory() {
+    return schemaFactory;
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return enabledPlugins.iterator();
+  }
+
+  @Override
+  public synchronized void close() throws Exception {
+    ephemeralPlugins.invalidateAll();
+    enabledPlugins.close();
+    pluginSystemTable.close();
+  }
+
+  /**
+   * Add a plugin and configuration. Assumes neither exists. Primarily for testing.
+   *
+   * @param config plugin config
+   * @param plugin plugin implementation
+   */
+  @VisibleForTesting
+  public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
+    addEnabledPlugin(name, plugin);
+    pluginSystemTable.putIfAbsent(name, config);
+  }
+
+  /**
+   * <ol>
+   *   <li>Initializes persistent store for storage plugins.</li>
+   *   <li>Since storage plugins names are case-insensitive in Drill, to ensure backward compatibility,
+   *   re-writes those not stored in lower case with lower case names, for duplicates issues warning. </li>
+   *   <li>Wraps plugin system table into case insensitive wrapper.</li>
+   * </ol>
+   *
+   * @param context drillbit context
+   * @param lpPersistence deserialization mapper provider
+   * @return persistent store for storage plugins
+   */
+  private PersistentStore<StoragePluginConfig> initPluginsSystemTable(DrillbitContext context, LogicalPlanPersistence lpPersistence) {
+
+    try {
+      PersistentStore<StoragePluginConfig> pluginSystemTable = context
+          .getStoreProvider()
+          .getOrCreateStore(PersistentStoreConfig
+              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
+              .name(PSTORE_NAME)
+              .build());
+
+      Iterator<Entry<String, StoragePluginConfig>> storedPlugins = pluginSystemTable.getAll();
+      while (storedPlugins.hasNext()) {
+        Entry<String, StoragePluginConfig> entry = storedPlugins.next();
+        String pluginName = entry.getKey();
+        if (!pluginName.equals(pluginName.toLowerCase())) {
+          logger.debug("Replacing plugin name {} with its lower case equivalent.", pluginName);
+          pluginSystemTable.delete(pluginName);
+          if (!pluginSystemTable.putIfAbsent(pluginName.toLowerCase(), entry.getValue())) {
+            logger.warn("Duplicated storage plugin name [{}] is found. Duplicate is deleted from persistent storage.", pluginName);
+          }
+        }
+      }
+
+      return new CaseInsensitivePersistentStore<>(pluginSystemTable);
+    } catch (StoreException e) {
+      logger.error("Failure while loading storage plugin registry.", e);
+      throw new DrillRuntimeException("Failure while reading and loading storage plugin configuration.", e);
+    }
+  }
+
+  /**
+   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
+   * instantiating of Drill
+   *
+   * @param lpPersistence deserialization mapper provider
+   * @return bootstrap storage plugins
+   * @throws IOException if a read error occurs
+   */
+  private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
+    // bootstrap load the config since no plugins are stored.
+    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
+    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+    if (urls != null && !urls.isEmpty()) {
+      logger.info("Loading the storage plugin configs from URLs {}.", urls);
+      StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
+      Map<String, URL> pluginURLMap = new HashMap<>();
+      for (URL url : urls) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+        for (Entry<String, StoragePluginConfig> plugin : plugins) {
+          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
+          if (oldPluginConfig != null) {
+            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
+                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
+          } else {
+            pluginURLMap.put(plugin.getKey(), url);
+          }
+        }
+      }
+      return bootstrapPlugins;
+    } else {
+      throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
+    }
+  }
+
+  /**
+   * Dynamically loads system plugins annotated with {@link SystemPlugin}.
+   * Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
+   *
+   * @param classpathScan classpath scan result
+   * @param context drillbit context
+   * @return map with system plugins stored by name
+   */
+  private Map<String, StoragePlugin> initSystemPlugins(ScanResult classpathScan, DrillbitContext context) {
+    Map<String, StoragePlugin> plugins = CaseInsensitiveMap.newHashMap();
+    List<AnnotatedClassDescriptor> annotatedClasses = classpathScan.getAnnotatedClasses(SystemPlugin.class.getName());
+    logger.trace("Found {} annotated classes with SystemPlugin annotation: {}.", annotatedClasses.size(), annotatedClasses);
+
+    for (AnnotatedClassDescriptor annotatedClass : annotatedClasses) {
+      try {
+        Class<?> aClass = Class.forName(annotatedClass.getClassName());
+        boolean isPluginInitialized = false;
+
+        for (Constructor<?> constructor : aClass.getConstructors()) {
+          Class<?>[] parameterTypes = constructor.getParameterTypes();
+
+          if (parameterTypes.length != 1 || parameterTypes[0] != DrillbitContext.class) {
+            logger.trace("Not matching constructor for {}. Expecting constructor with one parameter for DrillbitContext class.",
+                annotatedClass.getClassName());
+            continue;
+          }
+
+          Object instance = constructor.newInstance(context);
+          if (!(instance instanceof StoragePlugin)) {
+            logger.debug("Created instance of {} does not implement StoragePlugin interface.", annotatedClass.getClassName());
+            continue;
+          }
+
+          StoragePlugin storagePlugin = (StoragePlugin) instance;
+          String name = storagePlugin.getName();
+          if (name == null) {
+            logger.debug("Storage plugin name {} is not defined. Skipping plugin initialization.", annotatedClass.getClassName());
+            continue;
+          }
+          plugins.put(name, storagePlugin);
+          isPluginInitialized = true;
+
+        }
+        if (!isPluginInitialized) {
+          logger.debug("Skipping plugin registration, did not find matching constructor or initialized object of wrong type.");
+        }
+      } catch (ReflectiveOperationException e) {
+        logger.warn("Error during system plugin {} initialization. Plugin initialization will be skipped.", annotatedClass.getClassName(), e);
+      }
+    }
+    logger.trace("The following system plugins have been initialized: {}.", plugins.keySet());
+    return plugins;
+  }
+
+  /**
+   * Get a list of all available storage plugin class constructors.
+   * @param classpathScan A classpath scan to use.
+   * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
+   */
+  @SuppressWarnings("unchecked")
+  private Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
+    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
+    final Collection<Class<? extends StoragePlugin>> pluginClasses =
+        classpathScan.getImplementations(StoragePlugin.class);
+    final String lineBrokenList =
+        pluginClasses.size() == 0
+            ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
+    logger.debug("Found {} storage plugin configuration classes: {}.",
+        pluginClasses.size(), lineBrokenList);
+    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
+      int i = 0;
+      for (Constructor<?> c : plugin.getConstructors()) {
+        Class<?>[] params = c.getParameterTypes();
+        if (params.length != 3
+            || params[1] != DrillbitContext.class
+            || !StoragePluginConfig.class.isAssignableFrom(params[0])
+            || params[2] != String.class) {
+          logger.debug("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
+              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+          continue;
+        }
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if (i == 0) {
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
+            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+      }
+    }
+    return availablePlugins;
+  }
+
+  /**
+   * It initializes {@link #enabledPlugins} with currently enabled plugins
+   */
+  private void defineEnabledPlugins() {
+    Map<String, StoragePlugin> activePlugins = new HashMap<>();
+    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
+    while (allPlugins.hasNext()) {
+      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
+      String name = plugin.getKey();
+      StoragePluginConfig config = plugin.getValue();
+      if (config.isEnabled()) {
+        try {
+          StoragePlugin storagePlugin = create(name, config);
+          activePlugins.put(name, storagePlugin);
+        } catch (ExecutionSetupException e) {
+          logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
+          config.setEnabled(false);
+          pluginSystemTable.put(name, config);
+        }
+      }
+    }
+
+    activePlugins.putAll(systemPlugins);
+    enabledPlugins.putAll(activePlugins);
+  }
+
   private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
     // TODO: DRILL-6412: clients for storage plugins shouldn't be created, if storage plugin is disabled
     // Creating of the StoragePlugin leads to instantiating storage clients
@@ -343,30 +465,33 @@ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) thro
       plugin = c.newInstance(pluginConfig, context, name);
       plugin.start();
       return plugin;
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
-        | IOException e) {
+    } catch (ReflectiveOperationException | IOException e) {
       Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
       if (t instanceof ExecutionSetupException) {
         throw ((ExecutionSetupException) t);
       }
-      throw new ExecutionSetupException(String.format(
-          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+      throw new ExecutionSetupException(String.format("Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
     }
   }
 
-  @Override
-  public Iterator<Entry<String, StoragePlugin>> iterator() {
-    return enabledPlugins.iterator();
-  }
+  private void closePlugin(StoragePlugin plugin) {
+    if (plugin == null) {
+      return;
+    }
 
-  @Override
-  public SchemaFactory getSchemaFactory() {
-    return schemaFactory;
+    try {
+      plugin.close();
+    } catch (Exception e) {
+      logger.warn("Exception while shutting down storage plugin.");
+    }
   }
 
-  public class DrillSchemaFactory implements SchemaFactory {
+  public class DrillSchemaFactory extends AbstractSchemaFactory {
+
+    public DrillSchemaFactory(String name) {
+      super(name);
+    }
 
-    @SuppressWarnings("resource")
     @Override
     public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
       Stopwatch watch = Stopwatch.createStarted();
@@ -385,7 +510,7 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
         }
         // remove those which are no longer in the registry
         for (String pluginName : currentPluginNames) {
-          if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
+          if (systemPlugins.get(pluginName) != null) {
             continue;
           }
           enabledPlugins.remove(pluginName);
@@ -448,49 +573,4 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
 
   }
 
-  @Override
-  public synchronized void close() throws Exception {
-    ephemeralPlugins.invalidateAll();
-    enabledPlugins.close();
-    pluginSystemTable.close();
-  }
-
-  /**
-   * Get a list of all available storage plugin class constructors.
-   * @param classpathScan A classpath scan to use.
-   * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
-   */
-  @SuppressWarnings("unchecked")
-  public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
-    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
-    final Collection<Class<? extends StoragePlugin>> pluginClasses =
-        classpathScan.getImplementations(StoragePlugin.class);
-    final String lineBrokenList =
-        pluginClasses.size() == 0
-            ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
-    logger.debug("Found {} storage plugin configuration classes: {}.",
-        pluginClasses.size(), lineBrokenList);
-    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
-      int i = 0;
-      for (Constructor<?> c : plugin.getConstructors()) {
-        Class<?>[] params = c.getParameterTypes();
-        if (params.length != 3
-            || params[1] != DrillbitContext.class
-            || !StoragePluginConfig.class.isAssignableFrom(params[0])
-            || params[2] != String.class) {
-          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
-              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
-          continue;
-        }
-        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
-        i++;
-      }
-      if (i == 0) {
-        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
-            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
-      }
-    }
-    return availablePlugins;
-  }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java
new file mode 100644
index 00000000000..85236af1b40
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Is used to indicated system plugins which will be dynamically initialized during storage plugin registry init stage.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface SystemPlugin {
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4eda955820c..be944a02ccc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.util.Map;
+import java.util.Optional;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -25,6 +26,7 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 
 @JsonTypeName(FileSystemConfig.NAME)
 public class FileSystemConfig extends StoragePluginConfig {
@@ -43,7 +45,9 @@ public FileSystemConfig(@JsonProperty("connection") String connection,
                           @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
     this.connection = connection;
     this.config = config;
-    this.workspaces = workspaces;
+    Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
+    Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
+    this.workspaces = caseInsensitiveWorkspaces;
     this.formats = formats;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index cb66913ad13..ed6636641b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -57,6 +57,8 @@
  */
 public class FileSystemPlugin extends AbstractStoragePlugin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+
   private final FileSystemSchemaFactory schemaFactory;
   private final FormatCreator formatCreator;
   private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 0d7dce4cab8..1a97e60d367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -19,6 +19,8 @@
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,54 +29,50 @@
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
-
 /**
  * This is the top level schema that responds to root level path requests. Also supports
  */
-public class FileSystemSchemaFactory implements SchemaFactory{
+public class FileSystemSchemaFactory extends AbstractSchemaFactory {
 
   public static final String DEFAULT_WS_NAME = "default";
 
   public static final String LOCAL_FS_SCHEME = "file";
 
   private List<WorkspaceSchemaFactory> factories;
-  private String schemaName;
   protected FileSystemPlugin plugin;
 
   public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
+    super(schemaName);
     // when the correspondent FileSystemPlugin is not passed in, we dig into ANY workspace factory to get it.
     if (factories.size() > 0) {
       this.plugin = factories.get(0).getPlugin();
     }
-    this.schemaName = schemaName;
     this.factories = factories;
   }
 
   public FileSystemSchemaFactory(FileSystemPlugin plugin, String schemaName, List<WorkspaceSchemaFactory> factories) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
     this.factories = factories;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     @SuppressWarnings("resource")
-    FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig);
+    FileSystemSchema schema = new FileSystemSchema(getName(), schemaConfig);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
   }
@@ -82,10 +80,10 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
   public class FileSystemSchema extends AbstractSchema {
 
     private final WorkspaceSchema defaultSchema;
-    private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
+    private final Map<String, WorkspaceSchema> schemaMap = new HashMap<>();
 
     public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
-      super(ImmutableList.<String>of(), name);
+      super(Collections.emptyList(), name);
       final DrillFileSystem fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), plugin.getFsConf());
       for(WorkspaceSchemaFactory f :  factories){
         WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig, fs);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
index adf15a21595..317e608f482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
@@ -20,10 +20,11 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 public class InfoSchemaConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaConfig.class);
 
   public static final String NAME = "ischema";
 
+  public static final InfoSchemaConfig INSTANCE = new InfoSchemaConfig();
+
   @Override
   public int hashCode(){
     return 1;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index 15bdcd9b364..63e19d3ffbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -29,7 +29,7 @@
    String IS_CATALOG_CONNECT = "";
 
   /** Name of information schema. */
-   String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
+   String IS_SCHEMA_NAME = "information_schema";
 
   // CATALOGS column names:
    String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index fc1f01ab81d..4005a402a0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -17,35 +17,41 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
+
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.SystemPlugin;
 
+@SystemPlugin
 public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaStoragePlugin.class);
 
   private final InfoSchemaConfig config;
 
+  @SuppressWarnings("unused") // used in StoragePluginRegistryImpl to dynamically init system plugins
+  public InfoSchemaStoragePlugin(DrillbitContext context) {
+    this(InfoSchemaConfig.INSTANCE, context, InfoSchemaConstants.IS_SCHEMA_NAME);
+  }
+
   public InfoSchemaStoragePlugin(InfoSchemaConfig config, DrillbitContext context, String name){
     super(context, name);
     this.config = config;
@@ -57,8 +63,7 @@ public boolean supportsRead() {
   }
 
   @Override
-  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException {
+  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) {
     InfoSchemaTableType table = selection.getWith(getContext().getLpPersistence(),  InfoSchemaTableType.class);
     return new InfoSchemaGroupScan(table);
   }
@@ -69,23 +74,35 @@ public StoragePluginConfig getConfig() {
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    ISchema s = new ISchema(parent, this);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    ISchema s = new ISchema(this);
     parent.add(s.getName(), s);
   }
 
   /**
    * Representation of the INFORMATION_SCHEMA schema.
    */
-  private class ISchema extends AbstractSchema{
-    private Map<String, InfoSchemaDrillTable> tables;
-    public ISchema(SchemaPlus parent, InfoSchemaStoragePlugin plugin){
-      super(ImmutableList.<String>of(), IS_SCHEMA_NAME);
-      Map<String, InfoSchemaDrillTable> tbls = Maps.newHashMap();
-      for(InfoSchemaTableType tbl : InfoSchemaTableType.values()){
-        tbls.put(tbl.name(), new InfoSchemaDrillTable(plugin, IS_SCHEMA_NAME, tbl, config));
-      }
-      this.tables = ImmutableMap.copyOf(tbls);
+  private class ISchema extends AbstractSchema {
+
+    private final Map<String, InfoSchemaDrillTable> tables;
+    // for backward compatibility keep IS schema table names in upper case
+    // the way they used to appear in INFORMATION_SCHEMA.TABLES table
+    // though user can query them in any case
+    private final Set<String> originalTableNames;
+
+    ISchema(InfoSchemaStoragePlugin plugin) {
+
+      super(Collections.emptyList(), IS_SCHEMA_NAME);
+
+      this.tables = CaseInsensitiveMap.newHashMap();
+      this.originalTableNames = new HashSet<>();
+
+      Arrays.stream(InfoSchemaTableType.values()).forEach(
+          table -> {
+            tables.put(table.name(), new InfoSchemaDrillTable(plugin, getName(), table, config));
+            originalTableNames.add(table.name());
+          }
+      );
     }
 
     @Override
@@ -95,13 +112,18 @@ public Table getTable(String name) {
 
     @Override
     public Set<String> getTableNames() {
-      return tables.keySet();
+      return originalTableNames;
     }
 
     @Override
     public String getTypeName() {
       return InfoSchemaConfig.NAME;
     }
+
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java
new file mode 100644
index 00000000000..38bd529c519
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Wrapper around {@link PersistentStore} to ensure all passed keys are converted to lower case and stored this way.
+ * This will ensure case-insensitivity during insert, update, deletion or search.
+ */
+public class CaseInsensitivePersistentStore<V> implements PersistentStore<V> {
+
+  private final PersistentStore<V> underlyingStore;
+
+  public CaseInsensitivePersistentStore(PersistentStore<V> underlyingStore) {
+    this.underlyingStore = underlyingStore;
+  }
+
+  @Override
+  public boolean contains(String key) {
+    return underlyingStore.contains(key.toLowerCase());
+  }
+
+  @Override
+  public V get(String key) {
+    return underlyingStore.get(key.toLowerCase());
+  }
+
+  @Override
+  public void put(String key, V value) {
+    underlyingStore.put(key.toLowerCase(), value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getAll() {
+    return underlyingStore.getAll();
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return underlyingStore.getMode();
+  }
+
+  @Override
+  public void delete(String key) {
+    underlyingStore.delete(key.toLowerCase());
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    return underlyingStore.putIfAbsent(key.toLowerCase(), value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+    return underlyingStore.getRange(skip, take);
+  }
+
+  @Override
+  public void close() throws Exception {
+    underlyingStore.close();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 10e082fd618..d282017a167 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -17,39 +17,47 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SystemPlugin;
 import org.apache.drill.exec.store.pojo.PojoDataType;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
 /**
  * A "storage" plugin for system tables.
  */
+@SystemPlugin
 public class SystemTablePlugin extends AbstractStoragePlugin {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
 
   public static final String SYS_SCHEMA_NAME = "sys";
 
   private final SystemTablePluginConfig config;
-  private final SystemSchema schema = new SystemSchema();
+  private final SystemSchema schema;
+
+  @SuppressWarnings("unused") // used in StoragePluginRegistryImpl to dynamically init system plugins
+  public SystemTablePlugin(DrillbitContext context) {
+    this(SystemTablePluginConfig.INSTANCE, context, SYS_SCHEMA_NAME);
+  }
 
   public SystemTablePlugin(SystemTablePluginConfig config, DrillbitContext context, String name) {
     super(context, name);
     this.config = config;
+    this.schema = new SystemSchema(this);
   }
 
   @Override
@@ -73,31 +81,29 @@ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
    */
   private class SystemSchema extends AbstractSchema {
 
-    private final Set<String> tableNames;
+    private final Map<String, StaticDrillTable> tables;
+
+    SystemSchema(SystemTablePlugin plugin) {
 
-    public SystemSchema() {
-      super(ImmutableList.of(), SYS_SCHEMA_NAME);
-      Set<String> names = Sets.newHashSet();
-      for (SystemTable t : SystemTable.values()) {
-        names.add(t.getTableName());
-      }
-      this.tableNames = ImmutableSet.copyOf(names);
+      super(Collections.emptyList(), SYS_SCHEMA_NAME);
+
+      this.tables = Arrays.stream(SystemTable.values())
+          .collect(
+              Collectors.toMap(
+                  SystemTable::getTableName,
+                  table -> new StaticDrillTable(getName(), plugin, TableType.SYSTEM_TABLE, table, new PojoDataType(table.getPojoClass())),
+                  (o, n) -> n,
+                  CaseInsensitiveMap::newHashMap));
     }
 
     @Override
     public Set<String> getTableNames() {
-      return tableNames;
+      return tables.keySet();
     }
 
     @Override
     public DrillTable getTable(String name) {
-      for (SystemTable table : SystemTable.values()) {
-        if (table.getTableName().equalsIgnoreCase(name)) {
-          return new StaticDrillTable(getName(), SystemTablePlugin.this, TableType.SYSTEM_TABLE,
-            table, new PojoDataType(table.getPojoClass()));
-        }
-      }
-      return null;
+      return tables.get(name);
     }
 
     @Override
@@ -105,5 +111,9 @@ public String getTypeName() {
       return SystemTablePluginConfig.NAME;
     }
 
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
index 360182e9696..914fcf00f50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -23,7 +23,6 @@
  * A namesake plugin configuration for system tables.
  */
 public class SystemTablePluginConfig extends StoragePluginConfig {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
 
   public static final String NAME = "system-tables";
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index 0f717756927..7a9c9a6f246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -434,18 +434,36 @@ protected Response runInternal(final UserSession session, final SchemaTreeProvid
 
   /**
    * Helper method to create a {@link InfoSchemaFilter} that combines the given filters with an AND.
+   *
    * @param catalogNameFilter Optional filter on <code>catalog name</code>
    * @param schemaNameFilter Optional filter on <code>schema name</code>
    * @param tableNameFilter Optional filter on <code>table name</code>
    * @param tableTypeFilter Optional filter on <code>table type</code>
    * @param columnNameFilter Optional filter on <code>column name</code>
-   * @return
+   * @return information schema filter
    */
-  private static InfoSchemaFilter createInfoSchemaFilter(final LikeFilter catalogNameFilter,
-      final LikeFilter schemaNameFilter, final LikeFilter tableNameFilter, List<String> tableTypeFilter, final LikeFilter columnNameFilter) {
+  private static InfoSchemaFilter createInfoSchemaFilter(LikeFilter catalogNameFilter,
+                                                         LikeFilter schemaNameFilter,
+                                                         LikeFilter tableNameFilter,
+                                                         List<String> tableTypeFilter,
+                                                         LikeFilter columnNameFilter) {
 
     FunctionExprNode exprNode = createLikeFunctionExprNode(CATS_COL_CATALOG_NAME,  catalogNameFilter);
 
+    // schema names are case insensitive in Drill and stored in lower case
+    // convert like filter condition elements to lower case
+    if (schemaNameFilter != null) {
+      LikeFilter.Builder builder = LikeFilter.newBuilder();
+      if (schemaNameFilter.hasPattern()) {
+        builder.setPattern(schemaNameFilter.getPattern().toLowerCase());
+      }
+
+      if (schemaNameFilter.hasEscape()) {
+        builder.setEscape(schemaNameFilter.getEscape().toLowerCase());
+      }
+      schemaNameFilter = builder.build();
+    }
+
     exprNode = combineFunctions(AND_FUNCTION,
         exprNode,
         combineFunctions(OR_FUNCTION,
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 06c6978d199..d8915a09086 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -32,16 +32,23 @@ drill {
       org.apache.drill.exec.store.StoragePlugin
     ],
 
-    annotations += org.apache.drill.exec.expr.annotations.FunctionTemplate
+    annotations: ${?drill.classpath.scanning.annotations} [
+      org.apache.drill.exec.expr.annotations.FunctionTemplate,
+      org.apache.drill.exec.store.SystemPlugin
+    ],
 
     packages: ${?drill.classpath.scanning.packages} [
-          org.apache.drill.exec.expr,
-          org.apache.drill.exec.physical,
-          org.apache.drill.exec.store,
-          org.apache.drill.exec.rpc.user.security,
-          org.apache.drill.exec.rpc.security,
-          org.apache.drill.exec.server.rest.auth
-    ]
+      org.apache.drill.exec.expr,
+      org.apache.drill.exec.physical,
+      org.apache.drill.exec.store,
+      org.apache.drill.exec.rpc.user.security,
+      org.apache.drill.exec.rpc.security,
+      org.apache.drill.exec.server.rest.auth
+    ],
+
+    // caches scanned result during build time
+    // set to false to avoid the need for a full Drill build during development
+    cache.enabled: true
   }
 }
 
@@ -231,7 +238,7 @@ drill.exec: {
   },
   debug: {
     // If true, inserts the iterator validator atop each operator.
-    // Primrily used for testing.
+    // Primarily used for testing.
     validate_iterators: false,
     // If iterator validation is enabled, also validates the vectors
     // in each batch. Primarily used for testing. To enable from
diff --git a/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java b/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
index f4dc837acec..01b1b3babd3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
@@ -17,21 +17,19 @@
  */
 package org.apache.drill.common.scanner;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.sort;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.config.DrillConfig;
@@ -43,26 +41,27 @@
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.fn.impl.testing.GeneratorFunctions.RandomBigIntGauss;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.SystemPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({SlowTest.class})
 public class TestClassPathScanner {
 
-  @SafeVarargs
-  final private <T extends Comparable<? super T>> void assertListEqualsUnordered(Collection<T> list, T... expected) {
-    List<T> expectedList = asList(expected);
-    sort(expectedList);
-    List<T> gotList = new ArrayList<>(list);
-    sort(gotList);
-    assertEquals(expectedList.toString(), gotList.toString());
+  private static ScanResult result;
+
+  @BeforeClass
+  public static void init() {
+    result = ClassPathScanner.fromPrescan(DrillConfig.create());
   }
 
   @Test
-  public void test() throws Exception {
-    ScanResult result = ClassPathScanner.fromPrescan(DrillConfig.create());
-    List<AnnotatedClassDescriptor> functions = result.getAnnotatedClasses();
+  public void testFunctionTemplates() throws Exception {
+    List<AnnotatedClassDescriptor> functions = result.getAnnotatedClasses(FunctionTemplate.class.getName());
     Set<String> scanned = new HashSet<>();
     AnnotatedClassDescriptor functionRandomBigIntGauss = null;
     for (AnnotatedClassDescriptor function : functions) {
@@ -94,6 +93,7 @@ public void test() throws Exception {
       List<AnnotationDescriptor> scannedAnnotations = function.getAnnotations();
       verifyAnnotations(annotations, scannedAnnotations);
       FunctionTemplate bytecodeAnnotation = function.getAnnotationProxy(FunctionTemplate.class);
+      assertNotNull(bytecodeAnnotation);
       FunctionTemplate reflectionAnnotation = c.getAnnotation(FunctionTemplate.class);
       assertEquals(reflectionAnnotation.name(), bytecodeAnnotation.name());
       assertArrayEquals(reflectionAnnotation.names(), bytecodeAnnotation.names());
@@ -110,6 +110,17 @@ public void test() throws Exception {
     assertTrue(result.getImplementations(DrillFunc.class).size() > 0);
   }
 
+  @Test
+  public void testSystemPlugins() {
+    List<AnnotatedClassDescriptor> annotatedClasses = result.getAnnotatedClasses(SystemPlugin.class.getName());
+    List<AnnotatedClassDescriptor> foundPlugins =
+        annotatedClasses.stream()
+            .filter(a -> InfoSchemaStoragePlugin.class.getName().equals(a.getClassName())
+                || SystemTablePlugin.class.getName().equals(a.getClassName()))
+            .collect(Collectors.toList());
+    assertEquals(2, foundPlugins.size());
+  }
+
   private <T> void validateType(ScanResult result, String baseType) throws ClassNotFoundException {
     if (baseType.startsWith("org.apache.hadoop.hive")) {
       return;
@@ -132,8 +143,8 @@ private void verifyAnnotations(Annotation[] annotations, List<AnnotationDescript
       Class<? extends Annotation> annotationType = annotation.annotationType();
       assertEquals(annotationType.getName(), scannedAnnotation.getAnnotationType());
       if (annotation instanceof FunctionTemplate) {
-        FunctionTemplate ft = (FunctionTemplate)annotation;
-        if (ft.name() != null && !ft.name().equals("")) {
+        FunctionTemplate ft = (FunctionTemplate) annotation;
+        if (!"".equals(ft.name())) {
           assertEquals(ft.name(), scannedAnnotation.getSingleValue("name"));
         }
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
index e0e6c799054..aaa5ee96516 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
@@ -57,10 +57,10 @@
   private CuratorFramework curator;
   private ZookeeperClient client;
 
-  static class ClientWithMockCache extends ZookeeperClient {
+  private static class ClientWithMockCache extends ZookeeperClient {
     private final PathChildrenCache cacheMock = Mockito.mock(PathChildrenCache.class);
 
-    public ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
+    ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
       super(curator, root, mode);
     }
 
@@ -97,7 +97,7 @@ public void testStartingClientEnablesCacheAndEnsuresRootNodeExists() throws Exce
 
     Mockito
         .verify(client.getCache())
-        .start();
+        .start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
   }
 
   @Test
@@ -163,7 +163,7 @@ public void testGetWithEventualConsistencyHitsCache() {
         .when(client.getCache().getCurrentData(abspath))
         .thenReturn(null);
 
-    assertEquals("get should return null", null, client.get(path));
+    assertNull("get should return null", client.get(path));
 
     Mockito
         .when(client.getCache().getCurrentData(abspath))
@@ -198,7 +198,7 @@ public void testDelete() throws Exception {
 
 
   @Test
-  public void testEntriesReturnsRelativePaths() throws Exception {
+  public void testEntriesReturnsRelativePaths() {
     final ChildData child = Mockito.mock(ChildData.class);
     Mockito
         .when(child.getPath())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 3b25ddbfddf..17756a8bb1f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -44,7 +44,7 @@
 import static org.junit.Assert.assertTrue;
 
 public class BaseTestImpersonation extends PlanTestBase {
-  protected static final String MINIDFS_STORAGE_PLUGIN_NAME = "miniDfsPlugin";
+  protected static final String MINI_DFS_STORAGE_PLUGIN_NAME = "mini_dfs_plugin";
   protected static final String processUser = System.getProperty("user.name");
 
   protected static MiniDFSCluster dfsCluster;
@@ -74,8 +74,8 @@
 
   /**
    * Start a MiniDFS cluster backed Drillbit cluster with impersonation enabled.
-   * @param testClass
-   * @throws Exception
+   * @param testClass test class
+   * @throws Exception in case of errors during start up
    */
   protected static void startMiniDfsCluster(String testClass) throws Exception {
     startMiniDfsCluster(testClass, true);
@@ -83,12 +83,11 @@ protected static void startMiniDfsCluster(String testClass) throws Exception {
 
   /**
    * Start a MiniDFS cluster backed Drillbit cluster
-   * @param testClass
+   * @param testClass test class
    * @param isImpersonationEnabled Enable impersonation in the cluster?
-   * @throws Exception
+   * @throws Exception in case of errors during start up
    */
-  protected static void startMiniDfsCluster(
-      final String testClass, final boolean isImpersonationEnabled) throws Exception {
+  protected static void startMiniDfsCluster(String testClass, boolean isImpersonationEnabled) throws Exception {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(testClass), "Expected a non-null and non-empty test class name");
     dfsConf = new Configuration();
 
@@ -133,7 +132,7 @@ protected static void addMiniDfsBasedStorage(final Map<String, WorkspaceConfig>
 
     FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null, workspaces, lfsPluginConfig.getFormats());
     miniDfsPluginConfig.setEnabled(true);
-    pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+    pluginRegistry.createOrUpdate(MINI_DFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
   }
 
   protected static void createAndAddWorkspace(String name, String path, short permissions, String owner,
@@ -168,7 +167,7 @@ protected static void stopMiniDfsCluster() {
 
   // Return the user workspace for given user.
   protected static String getWSSchema(String user) {
-    return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+    return MINI_DFS_STORAGE_PLUGIN_NAME + "." + user;
   }
 
   protected static String getUserHome(String user) {
@@ -194,7 +193,7 @@ protected static void createView(final String viewOwner, final String viewGroup,
                                  final String viewDef) throws Exception {
     updateClient(viewOwner);
     test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, (short) 0750));
-    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
+    test("CREATE VIEW %s.%s.%s AS %s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
     final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
     fs.setOwner(viewFilePath, viewOwner, viewGroup);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
index cebf6492c80..700d820ab51 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
@@ -45,8 +45,7 @@ public static void setup() throws Exception {
 
   private static void createTestData() throws Exception {
     // Create test table in minidfs.tmp schema for use in test queries
-    test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`",
-        MINIDFS_STORAGE_PLUGIN_NAME));
+    test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINI_DFS_STORAGE_PLUGIN_NAME));
 
     // generate a large enough file that the DFS will not fulfill requests to read a
     // page of data all at once, see notes above testReadLargeParquetFileFromDFS()
@@ -59,8 +58,7 @@ private static void createTestData() throws Exception {
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
-        "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)",
-        MINIDFS_STORAGE_PLUGIN_NAME));
+        "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   /**
@@ -77,7 +75,7 @@ private static void createTestData() throws Exception {
    */
   @Test
   public void testReadLargeParquetFileFromDFS() throws Exception {
-    test(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME));
+    test(String.format("USE %s", MINI_DFS_STORAGE_PLUGIN_NAME));
     test("SELECT * FROM tmp.`large_employee`");
   }
 
@@ -87,7 +85,7 @@ public void testSimpleQuery() throws Exception {
         String.format("SELECT sales_city, sales_country FROM tmp.dfsRegion ORDER BY region_id DESC LIMIT 2");
 
     testBuilder()
-        .optionSettingQueriesForTestQuery(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME))
+        .optionSettingQueriesForTestQuery(String.format("USE %s", MINI_DFS_STORAGE_PLUGIN_NAME))
         .sqlQuery(query)
         .unOrdered()
         .baselineColumns("sales_city", "sales_country")
@@ -98,7 +96,7 @@ public void testSimpleQuery() throws Exception {
 
   @AfterClass
   public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index 08c09d186cc..d14c7ada852 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -52,8 +52,8 @@
   private static final String user1 = "drillTestUser1";
   private static final String user2 = "drillTestUser2";
 
-  private static final String group0 = "drillTestGrp0";
-  private static final String group1 = "drillTestGrp1";
+  private static final String group0 = "drill_test_grp_0";
+  private static final String group1 = "drill_test_grp_1";
 
   static {
     UserGroupInformation.createUserForTesting(user1, new String[]{ group1, group0 });
@@ -75,23 +75,23 @@ public static void setup() throws Exception {
 
     Map<String, WorkspaceConfig> workspaces = Maps.newHashMap();
 
-    // Create /drillTestGrp0_700 directory with permissions 700 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_700", "/drillTestGrp0_700", (short)0700, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_700 directory with permissions 700 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_700", "/drill_test_grp_0_700", (short)0700, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_750 directory with permissions 750 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_750", "/drillTestGrp0_750", (short)0750, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_750 directory with permissions 750 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_750", "/drill_test_grp_0_750", (short)0750, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_755 directory with permissions 755 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_755", "/drillTestGrp0_755", (short)0755, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_755 directory with permissions 755 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_755", "/drill_test_grp_0_755", (short)0755, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_770 directory with permissions 770 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_770", "/drillTestGrp0_770", (short)0770, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_770 directory with permissions 770 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_770", "/drill_test_grp_0_770", (short)0770, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_777 directory with permissions 777 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_777", "/drillTestGrp0_777", (short)0777, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_777 directory with permissions 777 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_777", "/drill_test_grp_0_777", (short)0777, processUser, group0, workspaces);
 
-    // Create /drillTestGrp1_700 directory with permissions 700 (owned by user1)
-    createAndAddWorkspace("drillTestGrp1_700", "/drillTestGrp1_700", (short)0700, user1, group1, workspaces);
+    // Create /drill_test_grp_1_700 directory with permissions 700 (owned by user1)
+    createAndAddWorkspace("drill_test_grp_1_700", "/drill_test_grp_1_700", (short)0700, user1, group1, workspaces);
 
     // create /user2_workspace1 with 775 permissions (owner by user1)
     createAndAddWorkspace("user2_workspace1", "/user2_workspace1", (short)0775, user2, group1, workspaces);
@@ -107,17 +107,17 @@ public void testDropTable() throws Exception {
 
     // create tables as user2
     updateClient(user2);
-    test("use `%s.user2_workspace1`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace1`", MINI_DFS_STORAGE_PLUGIN_NAME);
     // create a table that can be dropped by another user in a different group
     test("create table parquet_table_775 as select * from cp.`employee.json`");
 
     // create a table that cannot be dropped by another user
-    test("use `%s.user2_workspace2`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace2`", MINI_DFS_STORAGE_PLUGIN_NAME);
     test("create table parquet_table_700 as select * from cp.`employee.json`");
 
     // Drop tables as user1
     updateClient(user1);
-    test("use `%s.user2_workspace1`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace1`", MINI_DFS_STORAGE_PLUGIN_NAME);
     testBuilder()
         .sqlQuery("drop table parquet_table_775")
         .unOrdered()
@@ -125,7 +125,7 @@ public void testDropTable() throws Exception {
         .baselineValues(true, String.format("Table [%s] dropped", "parquet_table_775"))
         .go();
 
-    test("use `%s.user2_workspace2`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace2`", MINI_DFS_STORAGE_PLUGIN_NAME);
     boolean dropFailed = false;
     try {
       test("drop table parquet_table_700");
@@ -142,7 +142,7 @@ public void testImpersonatingProcessUser() throws Exception {
     updateClient(processUser);
 
     // Process user start the mini dfs, he has read/write permissions by default
-    final String viewName = String.format("%s.drillTestGrp0_700.testView", MINIDFS_STORAGE_PLUGIN_NAME);
+    final String viewName = String.format("%s.drill_test_grp_0_700.testView", MINI_DFS_STORAGE_PLUGIN_NAME);
     try {
       test("CREATE VIEW " + viewName + " AS SELECT * FROM cp.`region.json`");
       test("SELECT * FROM " + viewName + " LIMIT 2");
@@ -155,20 +155,20 @@ public void testImpersonatingProcessUser() throws Exception {
   public void testShowFilesInWSWithUserAndGroupPermissionsForQueryUser() throws Exception {
     updateClient(user1);
 
-    // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_1_700" which is owned by "user1"
+    int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
 
-    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for "user1"
-    count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_0_750" which is owned by "processUser" and has group permissions for "user1"
+    count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_0_750", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
     updateClient(user2);
-    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
-    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
+    int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
   }
 
@@ -178,8 +178,8 @@ public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
 
     try {
       setSessionOption(ExecConstants.LIST_FILES_RECURSIVELY, true);
-      // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-      int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+      // Try show tables in schema "drill_test_grp_1_700" which is owned by "user1"
+      int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME));
       assertEquals("Counts should match", 0, count);
     } finally {
       resetSessionOption(ExecConstants.LIST_FILES_RECURSIVELY);
@@ -189,53 +189,53 @@ public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
   @Test
   public void testShowSchemasAsUser1() throws Exception {
     // "user1" is part of "group0" and has access to following workspaces
-    // drillTestGrp1_700 (through ownership)
-    // drillTestGrp0_750, drillTestGrp0_770 (through "group" category permissions)
-    // drillTestGrp0_755, drillTestGrp0_777 (through "others" category permissions)
+    // drill_test_grp_1_700 (through ownership)
+    // drill_test_grp_0_750, drill_test_grp_0_770 (through "group" category permissions)
+    // drill_test_grp_0_755, drill_test_grp_0_777 (through "others" category permissions)
     updateClient(user1);
     testBuilder()
-        .sqlQuery("SHOW SCHEMAS LIKE '%drillTest%'")
+        .sqlQuery("SHOW SCHEMAS LIKE '%drill_test%'")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME")
-        .baselineValues(String.format("%s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_770", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_777", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_750", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_770", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_777", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME))
         .go();
   }
 
   @Test
   public void testShowSchemasAsUser2() throws Exception {
     // "user2" is part of "group0", but part of "group1" and has access to following workspaces
-    // drillTestGrp0_755, drillTestGrp0_777 (through "others" category permissions)
+    // drill_test_grp_0_755, drill_test_grp_0_777 (through "others" category permissions)
     updateClient(user2);
     testBuilder()
-        .sqlQuery("SHOW SCHEMAS LIKE '%drillTest%'")
+        .sqlQuery("SHOW SCHEMAS LIKE '%drill_test%'")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME")
-        .baselineValues(String.format("%s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_777", MINIDFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_777", MINI_DFS_STORAGE_PLUGIN_NAME))
         .go();
   }
 
   @Test
   public void testCreateViewInDirWithUserPermissionsForQueryUser() throws Exception {
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp1_700"; // Workspace dir owned by "user1"
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_1_700"; // Workspace dir owned by "user1"
     testCreateViewTestHelper(user1, viewSchema, "view1");
   }
 
   @Test
   public void testCreateViewInDirWithGroupPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_770";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_770";
     testCreateViewTestHelper(user1, viewSchema, "view1");
   }
 
   @Test
   public void testCreateViewInDirWithOtherPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_777";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_777";
     testCreateViewTestHelper(user2, viewSchema, "view1");
   }
 
@@ -273,7 +273,7 @@ private static void testCreateViewTestHelper(String user, String viewSchema,
   @Test
   public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_755";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_755";
     final String viewName = "view1";
 
     updateClient(user2);
@@ -282,7 +282,7 @@ public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
 
     final String query = "CREATE VIEW " + viewName + " AS SELECT " +
         "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;";
-    final String expErrorMsg = "PERMISSION ERROR: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755/";
+    final String expErrorMsg = "PERMISSION ERROR: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drill_test_grp_0_755/";
     errorMsgTestHelper(query, expErrorMsg);
 
     // SHOW TABLES is expected to return no records as view creation fails above.
@@ -296,21 +296,21 @@ public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
 
   @Test
   public void testCreateTableInDirWithUserPermissionsForQueryUser() throws Exception {
-    final String tableWS = "drillTestGrp1_700"; // Workspace dir owned by "user1"
+    final String tableWS = "drill_test_grp_1_700"; // Workspace dir owned by "user1"
     testCreateTableTestHelper(user1, tableWS, "table1");
   }
 
   @Test
   public void testCreateTableInDirWithGroupPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
-    final String tableWS = "drillTestGrp0_770";
+    final String tableWS = "drill_test_grp_0_770";
     testCreateTableTestHelper(user1, tableWS, "table1");
   }
 
   @Test
   public void testCreateTableInDirWithOtherPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String tableWS = "drillTestGrp0_777";
+    final String tableWS = "drill_test_grp_0_777";
     testCreateTableTestHelper(user2, tableWS, "table1");
   }
 
@@ -319,7 +319,7 @@ private static void testCreateTableTestHelper(String user, String tableWS,
     try {
       updateClient(user);
 
-      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+      test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
       test("CREATE TABLE " + tableName + " AS SELECT " +
           "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
@@ -345,7 +345,7 @@ private static void testCreateTableTestHelper(String user, String tableWS,
   @Test
   public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String tableWS = "drillTestGrp0_755";
+    final String tableWS = "drill_test_grp_0_755";
     final String tableName = "table1";
 
     UserRemoteException ex = null;
@@ -353,7 +353,7 @@ public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception
     try {
       updateClient(user2);
 
-      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+      test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
       test("CREATE TABLE " + tableName + " AS SELECT " +
           "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
@@ -363,16 +363,16 @@ public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(),
-        containsString("SYSTEM ERROR: RemoteException: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755/"));
+        containsString("SYSTEM ERROR: RemoteException: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drill_test_grp_0_755/"));
   }
 
   @Test
   public void testRefreshMetadata() throws Exception {
     final String tableName = "nation1";
-    final String tableWS = "drillTestGrp1_700";
+    final String tableWS = "drill_test_grp_1_700";
 
     updateClient(user1);
-    test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+    test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
     test("CREATE TABLE " + tableName + " partition by (n_regionkey) AS SELECT * " +
               "FROM cp.`tpch/nation.parquet`;");
@@ -390,8 +390,8 @@ public void testRefreshMetadata() throws Exception {
   }
 
   @AfterClass
-  public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+  public static void removeMiniDfsBasedStorage() {
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index c3c1fc58512..ec9d75a3c9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -205,8 +205,7 @@ public void testDirectImpersonation_NoReadPermissions() throws Exception {
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(), containsString("PERMISSION ERROR: " +
-      String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]",
-        MINIDFS_STORAGE_PLUGIN_NAME)));
+      String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]", MINI_DFS_STORAGE_PLUGIN_NAME)));
   }
 
   @Test
@@ -261,19 +260,19 @@ public void testMultiLevelImpersonationJoinOneSideExceedsMaxUserHops() throws Ex
   public void sequenceFileChainedImpersonationWithView() throws Exception {
     // create a view named "simple_seq_view" on "simple.seq". View is owned by user0:group0 and has permissions 750
     createView(org1Users[0], org1Groups[0], "simple_seq_view",
-      String.format("SELECT convert_from(t.binary_key, 'UTF8') as k FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+      String.format("SELECT convert_from(t.binary_key, 'UTF8') as k FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
         new Path(getUserHome(org1Users[0]), "simple.seq")));
     try {
       updateClient(org1Users[1]);
-      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view");
+      test("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
     createView(org1Users[1], org1Groups[1], "simple_seq_view_2",
-      String.format("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view"));
+      String.format("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view"));
     try {
       updateClient(org1Users[2]);
-      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view_2");
+      test("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view_2");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
@@ -282,11 +281,11 @@ public void sequenceFileChainedImpersonationWithView() throws Exception {
   @Test
   public void avroChainedImpersonationWithView() throws Exception {
     createView(org1Users[0], org1Groups[0], "simple_avro_view",
-      String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+      String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
         new Path(getUserHome(org1Users[0]), "simple.avro")));
     try {
       updateClient(org1Users[1]);
-      test("SELECT h_boolean FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
+      test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
@@ -294,7 +293,7 @@ public void avroChainedImpersonationWithView() throws Exception {
 
   @AfterClass
   public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
index dea4c596acd..f7c329c660b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
@@ -96,7 +96,7 @@ public void testConstExprFolding_maxDir0() throws Exception {
         .add("BIGFILE_2")
         .build();
 
-    String query = "select * from dfs.`%s/*/*.csv` where dir0 = %s('dfs.root','%s')";
+    String query = "select * from dfs.`%s/*/*.csv` where dir0 = %s('dFs.RoOt','%s')";
     for (ConstantFoldingTestConfig config : tests) {
       // make all of the other folders unexpected patterns, except for the one expected in this case
       List<String> excludedPatterns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 6e7d05406a9..e295eeee7e3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -17,33 +17,33 @@
  */
 package org.apache.drill.exec.sql;
 
-import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SqlTest;
-import org.apache.drill.test.TestBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Contains tests for
  * -- InformationSchema
@@ -84,15 +84,13 @@ public void catalogs() throws Exception {
 
   @Test
   public void showTablesFromDb() throws Exception{
-    final List<String[]> expected =
-        ImmutableList.of(
-            new String[] { "INFORMATION_SCHEMA", "VIEWS" },
-            new String[] { "INFORMATION_SCHEMA", "COLUMNS" },
-            new String[] { "INFORMATION_SCHEMA", "TABLES" },
-            new String[] { "INFORMATION_SCHEMA", "CATALOGS" },
-            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" },
-            new String[] { "INFORMATION_SCHEMA", "FILES" }
-        );
+    final List<String[]> expected = Arrays.asList(
+        new String[]{"information_schema", "VIEWS"},
+        new String[]{"information_schema", "COLUMNS"},
+        new String[]{"information_schema", "TABLES"},
+        new String[]{"information_schema", "CATALOGS"},
+        new String[]{"information_schema", "SCHEMATA"},
+        new String[]{"information_schema", "FILES"});
 
     final TestBuilder t1 = testBuilder()
         .sqlQuery("SHOW TABLES FROM INFORMATION_SCHEMA")
@@ -119,7 +117,7 @@ public void showTablesFromDbWhere() throws Exception{
         .sqlQuery("SHOW TABLES FROM INFORMATION_SCHEMA WHERE TABLE_NAME='VIEWS'")
         .unOrdered()
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
-        .baselineValues("INFORMATION_SCHEMA", "VIEWS")
+        .baselineValues("information_schema", "VIEWS")
         .go();
   }
 
@@ -130,38 +128,26 @@ public void showTablesLike() throws Exception{
         .unOrdered()
         .optionSettingQueriesForTestQuery("USE INFORMATION_SCHEMA")
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
-        .baselineValues("INFORMATION_SCHEMA", "SCHEMATA")
+        .baselineValues("information_schema", "SCHEMATA")
         .go();
   }
 
   @Test
   public void showDatabases() throws Exception{
-    final List<String[]> expected =
-        ImmutableList.of(
-            new String[] { "dfs.default" },
-            new String[] { "dfs.root" },
-            new String[] { "dfs.tmp" },
-            new String[] { "cp.default" },
-            new String[] { "sys" },
-            new String[] { "INFORMATION_SCHEMA" }
-        );
+    List<String> expected = Arrays.asList("dfs.default", "dfs.root", "dfs.tmp", "cp.default", "sys", "information_schema");
 
-    final TestBuilder t1 = testBuilder()
+    TestBuilder t1 = testBuilder()
         .sqlQuery("SHOW DATABASES")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME");
-    for(String[] expectedRow : expected) {
-      t1.baselineValues(expectedRow);
-    }
+    expected.forEach(t1::baselineValues);
     t1.go();
 
-    final TestBuilder t2 = testBuilder()
+    TestBuilder t2 = testBuilder()
         .sqlQuery("SHOW SCHEMAS")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME");
-    for(String[] expectedRow : expected) {
-      t2.baselineValues(expectedRow);
-    }
+    expected.forEach(t2::baselineValues);
     t2.go();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
new file mode 100644
index 00000000000..c274e0b5b64
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchemaCaseInsensitivity extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testUseCommand() throws Exception {
+    queryBuilder().sql("use Information_Schema").run();
+    queryBuilder().sql("use Sys").run();
+    queryBuilder().sql("use Dfs").run();
+    queryBuilder().sql("use Dfs.Tmp").run();
+  }
+
+  @Test
+  public void testDescribeSchema() throws Exception {
+    checkRecordCount(1, "describe schema SyS");
+    checkRecordCount(1, "describe schema Information_Schema");
+
+    client.testBuilder()
+        .sqlQuery("describe schema DfS.tMp")
+        .unOrdered()
+        .sqlBaselineQuery("describe schema dfs.tmp")
+        .go();
+  }
+
+  @Test
+  public void testDescribeTable() throws Exception {
+    checkRecordCount(4, "describe Information_Schema.`Tables`");
+    checkRecordCount(1, "describe Information_Schema.`Tables` Table_Catalog");
+    checkRecordCount(1, "describe Information_Schema.`Tables` '%Catalog'");
+    checkRecordCount(6, "describe SyS.Version");
+  }
+
+
+  @Test
+  public void testShowSchemas() throws Exception {
+    checkRecordCount(1, "show schemas like '%Y%'");
+    checkRecordCount(1, "show schemas like 'Info%'");
+    checkRecordCount(1, "show schemas like 'D%Tmp'");
+  }
+
+  @Test
+  public void testShowTables() throws Exception {
+    checkRecordCount(1, "show tables in Information_Schema like 'SC%'");
+    checkRecordCount(1, "show tables in Sys like '%ION'");
+  }
+
+  @Test
+  public void testSelectStatement() throws Exception {
+    checkRecordCount(1, "select * from Information_Schema.Schemata where Schema_Name = 'dfs.tmp'");
+    checkRecordCount(1, "select * from Sys.Version");
+  }
+
+  private void checkRecordCount(long recordCount, String sqlQuery) throws Exception {
+    QueryBuilder.QuerySummary summary = queryBuilder().sql(sqlQuery).run();
+    assertTrue(summary.succeeded());
+    assertEquals(recordCount, summary.recordCount());
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index 8d34ebc8a9e..c5b607b1f4e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -24,7 +24,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.test.BaseTestQuery;
@@ -105,7 +105,7 @@ public void schemas() throws Exception {
     List<SchemaMetadata> schemas = resp.getSchemasList();
     assertEquals(6, schemas.size());
 
-    verifySchema("INFORMATION_SCHEMA", schemas);
+    verifySchema("information_schema", schemas);
     verifySchema("cp.default", schemas);
     verifySchema("dfs.default", schemas);
     verifySchema("dfs.root", schemas);
@@ -115,15 +115,15 @@ public void schemas() throws Exception {
 
   @Test
   public void schemasWithSchemaNameFilter() throws Exception {
-    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%tion_sc%'"); // SQL equivalent
 
-    GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setPattern("%y%").build()).get();
+    GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setPattern("%TiOn_Sc%").build()).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<SchemaMetadata> schemas = resp.getSchemasList();
     assertEquals(1, schemas.size());
 
-    verifySchema("sys", schemas);
+    verifySchema("information_schema", schemas);
   }
 
   @Test
@@ -151,12 +151,12 @@ public void tables() throws Exception {
     List<TableMetadata> tables = resp.getTablesList();
     assertEquals(18, tables.size());
 
-    verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
-    verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
-    verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
-    verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
-    verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
-    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
+    verifyTable("information_schema", "CATALOGS", tables);
+    verifyTable("information_schema", "COLUMNS", tables);
+    verifyTable("information_schema", "SCHEMATA", tables);
+    verifyTable("information_schema", "TABLES", tables);
+    verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -172,7 +172,7 @@ public void tables() throws Exception {
   public void tablesWithTableFilter() throws Exception {
     // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_TYPE IN ('TABLE')"); // SQL equivalent
 
-    GetTablesResp resp = client.getTables(null, null, null, Arrays.asList("TABLE")).get();
+    GetTablesResp resp = client.getTables(null, null, null, Collections.singletonList("TABLE")).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
@@ -183,18 +183,18 @@ public void tablesWithTableFilter() throws Exception {
   public void tablesWithSystemTableFilter() throws Exception {
     // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_TYPE IN ('SYSTEM_TABLE')"); // SQL equivalent
 
-    GetTablesResp resp = client.getTables(null, null, null, Arrays.asList("SYSTEM_TABLE")).get();
+    GetTablesResp resp = client.getTables(null, null, null, Collections.singletonList("SYSTEM_TABLE")).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
     assertEquals(18, tables.size());
 
-    verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
-    verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
-    verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
-    verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
-    verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
-    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
+    verifyTable("information_schema", "CATALOGS", tables);
+    verifyTable("information_schema", "COLUMNS", tables);
+    verifyTable("information_schema", "SCHEMATA", tables);
+    verifyTable("information_schema", "TABLES", tables);
+    verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services