You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/08/27 08:25:32 UTC

[drill] 01/06: DRILL-6492: Ensure schema / workspace case insensitivity in Drill

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 8bcb103a0e3bcc5f85a03cbed3c6c0cea254ec4e
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Fri Aug 17 17:04:30 2018 +0300

    DRILL-6492: Ensure schema / workspace case insensitivity in Drill
    
    1. StoragePluginsRegistryImpl was updated:
    a. for backward compatibility at init to convert all existing storage plugins names to lower case, in case of duplicates, to log warning and skip the duplicate.
    b. to wrap persistent plugins registry into case insensitive store wrapper (CaseInsensitivePersistentStore) to ensure all given keys are converted into lower case when performing insert, update, delete, search operations.
    c. to load system storage plugins dynamically by @SystemStorage annotation.
    2. StoragePlugins class was updated to stored storage plugins configs by name in case insensitive map.
    3. SchemaUtilities.searchSchemaTree method was updated to convert all schema names into lower case to ensure that are they are matched case insensitively (all schemas are stored in Drill in lower case).
    4. FileSystemConfig was updated to store workspaces by name in case insensitive hash map.
    5. All plugins schema factories are now extend AbstractSchemaFactory to ensure that given schema name is converted to lower case.
    6. New method areTableNamesAreCaseInsensitive was added to AbstractSchema to indicate if schema tables names are case insensitive. By default, false. Schema implementation is responsible for table names case insensitive search in case it supports one. Currently, information_schema, sys and hive do so.
    7. System storage plugins (information_schema, sys) were refactored to ensure their schema, table names are case insensitive, also the annotation @SystemPlugin and additional constructor were added to allow dynamically load system plugins at storage plugin registry during init phase.
    8. MetadataProvider was updated to concert all schema filter conditions into lower case to ensure schema would be matched case insensitively.
    9. ShowSchemasHandler, ShowTablesHandler, DescribeTableHandler were updated to ensure schema / tables names (this depends if schema supports case insensitive table names) would be found case insensitively.
    
    git closes #1439
---
 .../drill/common/map/CaseInsensitiveMap.java       |  25 +-
 .../drill/common/scanner/ClassPathScanner.java     |  22 +-
 .../drill/common/map/TestCaseInsensitiveMap.java   |  15 +
 .../drill/exec/store/hbase/HBaseSchemaFactory.java |  26 +-
 .../exec/store/hive/schema/HiveDatabaseSchema.java |  11 +-
 .../exec/store/hive/schema/HiveSchemaFactory.java  |  29 +-
 .../apache/drill/exec/hive/TestHiveStorage.java    |   5 +
 .../exec/hive/TestInfoSchemaOnHiveStorage.java     |   2 +-
 .../hive/BaseTestHiveImpersonation.java            |  11 -
 .../hive/TestSqlStdBasedAuthorization.java         |  19 +-
 .../hive/TestStorageBasedHiveAuthorization.java    |  30 +-
 .../store/kafka/schema/KafkaSchemaFactory.java     |  14 +-
 .../drill/exec/store/kudu/KuduSchemaFactory.java   |  22 +-
 .../store/mongo/schema/MongoSchemaFactory.java     |  16 +-
 contrib/storage-opentsdb/README.md                 |  16 +-
 .../exec/store/openTSDB/OpenTSDBStoragePlugin.java |   5 +-
 .../openTSDB/schema/OpenTSDBSchemaFactory.java     |  19 +-
 .../drill/exec/coord/zk/ZookeeperClient.java       |   2 +-
 .../expr/fn/registry/LocalFunctionRegistry.java    |  11 +-
 .../drill/exec/planner/logical/StoragePlugins.java |   7 +-
 .../drill/exec/planner/sql/SchemaUtilites.java     |   3 +-
 .../sql/handlers/DescribeSchemaHandler.java        |   6 +-
 .../planner/sql/handlers/DescribeTableHandler.java |  69 +--
 .../planner/sql/handlers/ShowSchemasHandler.java   |  27 +-
 .../planner/sql/handlers/ShowTablesHandler.java    |  74 ++--
 .../apache/drill/exec/store/AbstractSchema.java    |  23 +-
 ...chemaConfig.java => AbstractSchemaFactory.java} |  22 +-
 .../drill/exec/store/AbstractStoragePlugin.java    |  14 +-
 .../drill/exec/store/PartitionExplorerImpl.java    |   2 +-
 .../org/apache/drill/exec/store/SchemaFactory.java |   5 +-
 .../org/apache/drill/exec/store/StoragePlugin.java |   2 +
 .../apache/drill/exec/store/StoragePluginMap.java  |   6 +-
 .../drill/exec/store/StoragePluginRegistry.java    |   6 +-
 .../exec/store/StoragePluginRegistryImpl.java      | 468 ++++++++++++---------
 .../InfoSchemaConfig.java => SystemPlugin.java}    |  28 +-
 .../drill/exec/store/dfs/FileSystemConfig.java     |   6 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java     |   2 +
 .../exec/store/dfs/FileSystemSchemaFactory.java    |  20 +-
 .../drill/exec/store/ischema/InfoSchemaConfig.java |   3 +-
 .../exec/store/ischema/InfoSchemaConstants.java    |   2 +-
 .../store/ischema/InfoSchemaStoragePlugin.java     |  62 ++-
 .../store/sys/CaseInsensitivePersistentStore.java  |  79 ++++
 .../drill/exec/store/sys/SystemTablePlugin.java    |  54 ++-
 .../exec/store/sys/SystemTablePluginConfig.java    |   1 -
 .../drill/exec/work/metadata/MetadataProvider.java |  24 +-
 .../java-exec/src/main/resources/drill-module.conf |  25 +-
 .../drill/common/scanner/TestClassPathScanner.java |  43 +-
 .../drill/exec/coord/zk/TestZookeeperClient.java   |  10 +-
 .../exec/impersonation/BaseTestImpersonation.java  |  19 +-
 .../TestImpersonationDisabledWithMiniDFS.java      |  12 +-
 .../impersonation/TestImpersonationMetadata.java   | 112 ++---
 .../impersonation/TestImpersonationQueries.java    |  17 +-
 .../exec/planner/TestDirectoryExplorerUDFs.java    |   2 +-
 .../org/apache/drill/exec/sql/TestInfoSchema.java  |  64 ++-
 .../exec/sql/TestSchemaCaseInsensitivity.java      |  92 ++++
 .../exec/work/metadata/TestMetadataProvider.java   |  38 +-
 56 files changed, 1061 insertions(+), 688 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java b/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
index 20e46dd..a9f18d3 100644
--- a/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
+++ b/common/src/main/java/org/apache/drill/common/map/CaseInsensitiveMap.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Maps;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -41,7 +42,7 @@ public class CaseInsensitiveMap<VALUE> implements Map<String, VALUE> {
    * @return key case-insensitive concurrent map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newConcurrentMap() {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newConcurrentMap());
+    return new CaseInsensitiveMap<>(Maps.newConcurrentMap());
   }
 
   /**
@@ -51,7 +52,7 @@ public class CaseInsensitiveMap<VALUE> implements Map<String, VALUE> {
    * @return key case-insensitive hash map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newHashMap() {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newHashMap());
+    return new CaseInsensitiveMap<>(Maps.newHashMap());
   }
 
   /**
@@ -63,7 +64,7 @@ public class CaseInsensitiveMap<VALUE> implements Map<String, VALUE> {
    * @return key case-insensitive hash map
    */
   public static <VALUE> CaseInsensitiveMap<VALUE> newHashMapWithExpectedSize(final int expectedSize) {
-    return new CaseInsensitiveMap<>(Maps.<String, VALUE>newHashMapWithExpectedSize(expectedSize));
+    return new CaseInsensitiveMap<>(Maps.newHashMapWithExpectedSize(expectedSize));
   }
 
   /**
@@ -154,4 +155,22 @@ public class CaseInsensitiveMap<VALUE> implements Map<String, VALUE> {
   public Set<Entry<String, VALUE>> entrySet() {
     return underlyingMap.entrySet();
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(underlyingMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof CaseInsensitiveMap)) {
+      return false;
+    }
+    CaseInsensitiveMap<?> that = (CaseInsensitiveMap<?>) o;
+    return Objects.equals(underlyingMap, that.underlyingMap);
+  }
+
 }
diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
index 909e811..552c073 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
@@ -133,12 +133,9 @@ public final class ClassPathScanner {
      * @return the class names of all children direct or indirect
      */
     public Set<ChildClassDescriptor> getChildrenOf(String name) {
-      Set<ChildClassDescriptor> result = new HashSet<>();
       Collection<ChildClassDescriptor> scannedChildren = children.get(name);
       // add all scanned children
-      for (ChildClassDescriptor child : scannedChildren) {
-        result.add(child);
-      }
+      Set<ChildClassDescriptor> result = new HashSet<>(scannedChildren);
       // recursively add children's children
       Collection<ChildClassDescriptor> allChildren = new ArrayList<>();
       allChildren.addAll(scannedChildren);
@@ -272,7 +269,7 @@ public final class ClassPathScanner {
           List<FieldDescriptor> fieldDescriptors = new ArrayList<>(classFields.size());
           for (FieldInfo field : classFields) {
             String fieldName = field.getName();
-            AnnotationsAttribute fieldAnnotations = ((AnnotationsAttribute)field.getAttribute(AnnotationsAttribute.visibleTag));
+            AnnotationsAttribute fieldAnnotations = ((AnnotationsAttribute) field.getAttribute(AnnotationsAttribute.visibleTag));
             fieldDescriptors.add(new FieldDescriptor(fieldName, field.getDescriptor(), getAnnotationDescriptors(fieldAnnotations)));
           }
           functions.add(new AnnotatedClassDescriptor(classFile.getName(), classAnnotations, fieldDescriptors));
@@ -281,6 +278,9 @@ public final class ClassPathScanner {
     }
 
     private List<AnnotationDescriptor> getAnnotationDescriptors(AnnotationsAttribute annotationsAttr) {
+      if (annotationsAttr == null) {
+        return Collections.emptyList();
+      }
       List<AnnotationDescriptor> annotationDescriptors = new ArrayList<>(annotationsAttr.numAnnotations());
       for (javassist.bytecode.annotation.Annotation annotation : annotationsAttr.getAnnotations()) {
         // Sigh: javassist uses raw collections (is this 2002?)
@@ -319,7 +319,7 @@ public final class ClassPathScanner {
    *           to scan for (relative to specified class loaders' classpath roots)
    * @param  returnRootPathname  whether to collect classpath root portion of
    *           URL for each resource instead of full URL of each resource
-   * @returns  ...; empty set if none
+   * @return  empty set if none
    */
   public static Set<URL> forResource(final String resourcePathname, final boolean returnRootPathname) {
     logger.debug("Scanning classpath for resources with pathname \"{}\".",
@@ -437,11 +437,11 @@ public final class ClassPathScanner {
 
   static ScanResult emptyResult() {
     return new ScanResult(
-        Collections.<String>emptyList(),
-        Collections.<String>emptyList(),
-        Collections.<String>emptyList(),
-        Collections.<AnnotatedClassDescriptor>emptyList(),
-        Collections.<ParentClassDescriptor>emptyList());
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList());
   }
 
   public static ScanResult fromPrescan(DrillConfig config) {
diff --git a/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java b/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
index 8901d53..19b2b93 100644
--- a/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
+++ b/common/src/test/java/org/apache/drill/common/map/TestCaseInsensitiveMap.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -66,4 +67,18 @@ public class TestCaseInsensitiveMap {
     final Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
     assertEquals(2, entrySet.size());
   }
+
+  @Test
+  public void checkEquals() {
+    Map<String, String> map1 = CaseInsensitiveMap.newHashMap();
+    map1.put("key_1", "value_1");
+
+    Map<String, String> map2 = CaseInsensitiveMap.newHashMap();
+    map2.put("KEY_1", "value_1");
+    assertEquals(map1, map2);
+
+    map2.put("key_2", "value_2");
+    assertNotEquals(map1, map2);
+  }
+
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index b8e825b..46e0444 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -24,36 +24,34 @@ import java.util.Set;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Admin;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class HBaseSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
+public class HBaseSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
 
-  final String schemaName;
-  final HBaseStoragePlugin plugin;
+  private final HBaseStoragePlugin plugin;
 
   public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOException {
+    super(name);
     this.plugin = plugin;
-    this.schemaName = name;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    HBaseSchema schema = new HBaseSchema(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    HBaseSchema schema = new HBaseSchema(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
   class HBaseSchema extends AbstractSchema {
 
-    public HBaseSchema(String name) {
-      super(ImmutableList.<String>of(), name);
+    HBaseSchema(String name) {
+      super(Collections.emptyList(), name);
     }
 
     public void setHolder(SchemaPlus plusOfThis) {
@@ -73,13 +71,13 @@ public class HBaseSchemaFactory implements SchemaFactory {
     public Table getTable(String name) {
       HBaseScanSpec scanSpec = new HBaseScanSpec(name);
       try {
-        return new DrillHBaseTable(schemaName, plugin, scanSpec);
+        return new DrillHBaseTable(getName(), plugin, scanSpec);
       } catch (Exception e) {
         // Calcite firstly looks for a table in the default schema, if the table was not found,
         // it looks in the root schema.
         // If the table does not exist, a query will fail at validation stage,
         // so the error should not be thrown here.
-        logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
+        logger.warn("Failure while loading table '{}' for database '{}'.", name, getName(), e.getCause());
         return null;
       }
     }
@@ -94,7 +92,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
         }
         return tableNames;
       } catch (Exception e) {
-        logger.warn("Failure while loading table names for database '{}'.", schemaName, e.getCause());
+        logger.warn("Failure while loading table names for database '{}'.", getName(), e.getCause());
         return Collections.emptySet();
       }
     }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
index ec1d0c6..23f346f 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
@@ -38,7 +38,8 @@ import org.apache.thrift.TException;
 import java.util.List;
 import java.util.Set;
 
-public class HiveDatabaseSchema extends AbstractSchema{
+public class HiveDatabaseSchema extends AbstractSchema {
+
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDatabaseSchema.class);
 
   private final HiveSchema hiveSchema;
@@ -105,10 +106,16 @@ public class HiveDatabaseSchema extends AbstractSchema{
     return tableNameToTable;
   }
 
+  @Override
+  public boolean areTableNamesCaseSensitive() {
+    return false;
+  }
+
   private static class HiveTableWithoutStatisticAndRowType implements Table {
+
     private final TableType tableType;
 
-    public HiveTableWithoutStatisticAndRowType(final TableType tableType) {
+    HiveTableWithoutStatisticAndRowType(final TableType tableType) {
       this.tableType = tableType;
     }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index e3cb3a2..53f6c60 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -33,8 +33,8 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
@@ -48,8 +48,8 @@ import org.apache.thrift.TException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class HiveSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
+public class HiveSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
 
   // MetaStoreClient created using process user credentials
   private final DrillHiveMetaStoreClient processUserMetastoreClient;
@@ -57,13 +57,12 @@ public class HiveSchemaFactory implements SchemaFactory {
   private final LoadingCache<String, DrillHiveMetaStoreClient> metaStoreClientLoadingCache;
 
   private final HiveStoragePlugin plugin;
-  private final String schemaName;
   private final HiveConf hiveConf;
   private final boolean isDrillImpersonationEnabled;
   private final boolean isHS2DoAsSet;
 
   public HiveSchemaFactory(final HiveStoragePlugin plugin, final String name, final HiveConf hiveConf) throws ExecutionSetupException {
-    this.schemaName = name;
+    super(name);
     this.plugin = plugin;
 
     this.hiveConf = hiveConf;
@@ -126,8 +125,8 @@ public class HiveSchemaFactory implements SchemaFactory {
         throw new IOException("Failure setting up Hive metastore client.", e);
       }
     }
-    HiveSchema schema = new HiveSchema(schemaConfig, mClientForSchemaTree, schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    HiveSchema schema = new HiveSchema(schemaConfig, mClientForSchemaTree, getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
@@ -137,7 +136,7 @@ public class HiveSchemaFactory implements SchemaFactory {
     private final DrillHiveMetaStoreClient mClient;
     private HiveDatabaseSchema defaultSchema;
 
-    public HiveSchema(final SchemaConfig schemaConfig, final DrillHiveMetaStoreClient mClient, final String name) {
+    HiveSchema(final SchemaConfig schemaConfig, final DrillHiveMetaStoreClient mClient, final String name) {
       super(ImmutableList.<String>of(), name);
       this.schemaConfig = schemaConfig;
       this.mClient = mClient;
@@ -149,7 +148,7 @@ public class HiveSchemaFactory implements SchemaFactory {
       try {
         List<String> dbs = mClient.getDatabases(schemaConfig.getIgnoreAuthErrors());
         if (!dbs.contains(name)) {
-          logger.debug("Database '{}' doesn't exists in Hive storage '{}'", name, schemaName);
+          logger.debug("Database '{}' doesn't exists in Hive storage '{}'", name, getName());
           return null;
         }
         HiveDatabaseSchema schema = getSubSchemaKnownExists(name);
@@ -164,8 +163,7 @@ public class HiveSchemaFactory implements SchemaFactory {
 
     /** Help method to get subschema when we know it exists (already checks the existence) */
     private HiveDatabaseSchema getSubSchemaKnownExists(String name) {
-      HiveDatabaseSchema schema = new HiveDatabaseSchema(this, name, mClient, schemaConfig);
-      return schema;
+      return new HiveDatabaseSchema(this, name, mClient, schemaConfig);
     }
 
     void setHolder(SchemaPlus plusOfThis) {
@@ -206,6 +204,11 @@ public class HiveSchemaFactory implements SchemaFactory {
       return defaultSchema.getTableNames();
     }
 
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
+
     DrillTable getDrillTable(String dbName, String t) {
       HiveReadEntry entry = getSelectionBaseOnName(dbName, t);
       if (entry == null) {
@@ -216,9 +219,9 @@ public class HiveSchemaFactory implements SchemaFactory {
           ImpersonationUtil.getProcessUserName();
 
       if (entry.getJdbcTableType() == TableType.VIEW) {
-        return new DrillHiveViewTable(schemaName, plugin, userToImpersonate, entry);
+        return new DrillHiveViewTable(getName(), plugin, userToImpersonate, entry);
       } else {
-        return new DrillHiveTable(schemaName, plugin, userToImpersonate, entry);
+        return new DrillHiveTable(getName(), plugin, userToImpersonate, entry);
       }
     }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 94f39b8..2410010 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -435,6 +435,11 @@ public class TestHiveStorage extends HiveTestBase {
     test(query);
   }
 
+  @Test
+  public void testSchemaCaseInsensitive() throws Exception {
+    test("select * from Hive.`Default`.Kv");
+  }
+
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
     for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) {
       assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName()));
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index c5c0d48..37b8ea0 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -90,7 +90,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("dfs.tmp")
         .baselineValues("sys")
         .baselineValues("cp.default")
-        .baselineValues("INFORMATION_SCHEMA")
+        .baselineValues("information_schema")
         .go();
   }
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
index 3c6e2c2..e361c66 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.impersonation.hive;
 
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.drill.test.TestBuilder;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.impersonation.BaseTestImpersonation;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
 import org.apache.hadoop.fs.FileSystem;
@@ -154,15 +152,6 @@ public class BaseTestHiveImpersonation extends BaseTestImpersonation {
     testBuilder.go();
   }
 
-  protected static void createView(final String viewOwner, final String viewGroup, final String viewName,
-                                 final String viewDef) throws Exception {
-    updateClient(viewOwner);
-    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, (short) 0750));
-    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
-    final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
-    fs.setOwner(viewFilePath, viewOwner, viewGroup);
-  }
-
   public static void stopHiveMetaStore() throws Exception {
     // Unfortunately Hive metastore doesn't provide an API to shut it down. It will be exited as part of the test JVM
     // exit. As each metastore server instance is using its own resources and not sharing it with other metastore
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
index 30b7430..7bc98a3 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestSqlStdBasedAuthorization.java
@@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.categories.SlowTest;
-import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
@@ -34,6 +33,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -66,10 +66,10 @@ public class TestSqlStdBasedAuthorization extends BaseTestHiveImpersonation {
   private static final String v_student_u1g1_750 = "v_student_u1g1_750";
 
   private static final String query_v_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
 
   private static final String query_v_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
 
   // Role for testing purpose
   private static final String test_role0 = "role0";
@@ -82,7 +82,7 @@ public class TestSqlStdBasedAuthorization extends BaseTestHiveImpersonation {
     startHiveMetaStore();
     startDrillCluster(true);
     addHiveStoragePlugin(getHivePluginConfig());
-    addMiniDfsBasedStorage(Maps.<String, WorkspaceConfig>newHashMap());
+    addMiniDfsBasedStorage(new HashMap<>());
     generateTestData();
   }
 
@@ -134,8 +134,7 @@ public class TestSqlStdBasedAuthorization extends BaseTestHiveImpersonation {
             hivePluginName, db_general, g_student_user0));
 
     createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
   }
 
   private static void createTbl(final Driver driver, final String db, final String tbl, final String tblDef,
@@ -277,15 +276,15 @@ public class TestSqlStdBasedAuthorization extends BaseTestHiveImpersonation {
   @Test
   public void selectUser2_v_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_student_u0g0_750,
-        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u0g0_750, String.format(
+        "Not authorized to read view [v_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_student_u1g1_750,
-        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u1g1_750, String.format(
+        "Not authorized to read view [v_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
index 5a3e373..d54c4e0 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/TestStorageBasedHiveAuthorization.java
@@ -92,10 +92,10 @@ public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation
   private static final String v_student_u1g1_750 = "v_student_u1g1_750";
 
   private static final String query_v_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750);
 
   private static final String query_v_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u1g1_750);
 
   // Create a view on "partitioned_student_u0_700". View is owned by user0:group0 and has permissions 750
   private static final String v_partitioned_student_u0g0_750 = "v_partitioned_student_u0g0_750";
@@ -104,11 +104,11 @@ public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation
   private static final String v_partitioned_student_u1g1_750 = "v_partitioned_student_u1g1_750";
 
   private static final String query_v_partitioned_student_u0g0_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp",
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp",
       v_partitioned_student_u0g0_750);
 
   private static final String query_v_partitioned_student_u1g1_750 = String.format(
-      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINIDFS_STORAGE_PLUGIN_NAME, "tmp",
+      "SELECT rownum FROM %s.%s.%s ORDER BY rownum LIMIT 1", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp",
       v_partitioned_student_u1g1_750);
 
   @BeforeClass
@@ -199,16 +199,14 @@ public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation
             hivePluginName, db_general, g_student_u0_700));
 
     createView(org1Users[1], org1Groups[1], v_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_student_u0g0_750));
 
     createView(org1Users[0], org1Groups[0], v_partitioned_student_u0g0_750,
         String.format("SELECT rownum, name, age, studentnum FROM %s.%s.%s",
             hivePluginName, db_general, g_partitioned_student_u0_700));
 
     createView(org1Users[1], org1Groups[1], v_partitioned_student_u1g1_750,
-        String.format("SELECT rownum, name, age FROM %s.%s.%s",
-            MINIDFS_STORAGE_PLUGIN_NAME, "tmp", v_partitioned_student_u0g0_750));
+        String.format("SELECT rownum, name, age FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", v_partitioned_student_u0g0_750));
   }
 
   private static void createPartitionedTable(final Driver hiveDriver, final String db, final String tbl,
@@ -521,15 +519,15 @@ public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation
   @Test
   public void selectUser2_v_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_student_u0g0_750,
-        "Not authorized to read view [v_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u0g0_750, String.format(
+        "Not authorized to read view [v_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_student_u1g1_750,
-        "Not authorized to read view [v_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_student_u1g1_750, String.format(
+        "Not authorized to read view [v_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
@@ -555,15 +553,15 @@ public class TestStorageBasedHiveAuthorization extends BaseTestHiveImpersonation
   @Test
   public void selectUser2_v_partitioned_student_u0g0_750() throws Exception {
     updateClient(org1Users[2]);
-    errorMsgTestHelper(query_v_partitioned_student_u0g0_750,
-        "Not authorized to read view [v_partitioned_student_u0g0_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_partitioned_student_u0g0_750, String.format(
+        "Not authorized to read view [v_partitioned_student_u0g0_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
   public void selectUser0_v_partitioned_student_u1g1_750() throws Exception {
     updateClient(org1Users[0]);
-    errorMsgTestHelper(query_v_partitioned_student_u1g1_750,
-        "Not authorized to read view [v_partitioned_student_u1g1_750] in schema [miniDfsPlugin.tmp]");
+    errorMsgTestHelper(query_v_partitioned_student_u1g1_750, String.format(
+        "Not authorized to read view [v_partitioned_student_u1g1_750] in schema [%s.tmp]", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   @Test
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
index 8f44a93..86ef095 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -20,25 +20,23 @@ package org.apache.drill.exec.store.kafka.schema;
 import java.io.IOException;
 
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 
-public class KafkaSchemaFactory implements SchemaFactory {
+public class KafkaSchemaFactory extends AbstractSchemaFactory {
 
-  private final String schemaName;
   private final KafkaStoragePlugin plugin;
 
   public KafkaSchemaFactory(KafkaStoragePlugin plugin, String schemaName) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
-      throws IOException {
-    KafkaMessageSchema schema = new KafkaMessageSchema(this.plugin, this.schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    KafkaMessageSchema schema = new KafkaMessageSchema(plugin, getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
index fe2def3..c1eb1e5 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -29,37 +29,35 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Writer;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.ListTablesResponse;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-public class KuduSchemaFactory implements SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
+public class KuduSchemaFactory extends AbstractSchemaFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
 
-  final String schemaName;
-  final KuduStoragePlugin plugin;
+  private final KuduStoragePlugin plugin;
 
   public KuduSchemaFactory(KuduStoragePlugin plugin, String name) throws IOException {
+    super(name);
     this.plugin = plugin;
-    this.schemaName = name;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    KuduTables schema = new KuduTables(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    KuduTables schema = new KuduTables(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
   class KuduTables extends AbstractSchema {
 
-    public KuduTables(String name) {
-      super(ImmutableList.<String>of(), name);
+    KuduTables(String name) {
+      super(Collections.emptyList(), name);
     }
 
     public void setHolder(SchemaPlus plusOfThis) {
@@ -81,7 +79,7 @@ public class KuduSchemaFactory implements SchemaFactory {
       try {
         KuduTable table = plugin.getClient().openTable(name);
         Schema schema = table.getSchema();
-        return new DrillKuduTable(schemaName, plugin, schema, scanSpec);
+        return new DrillKuduTable(getName(), plugin, schema, scanSpec);
       } catch (Exception e) {
         logger.warn("Failure while retrieving kudu table {}", name, e);
         return null;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 366f129..3437420 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -32,8 +32,8 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
 import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig;
@@ -49,21 +49,19 @@ import com.google.common.collect.Sets;
 import com.mongodb.MongoException;
 import com.mongodb.client.MongoDatabase;
 
-public class MongoSchemaFactory implements SchemaFactory {
+public class MongoSchemaFactory extends AbstractSchemaFactory {
 
-  static final Logger logger = LoggerFactory
-      .getLogger(MongoSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(MongoSchemaFactory.class);
 
   private static final String DATABASES = "databases";
 
   private LoadingCache<String, List<String>> databases;
   private LoadingCache<String, List<String>> tableNameLoader;
-  private final String schemaName;
   private final MongoStoragePlugin plugin;
 
   public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
 
     databases = CacheBuilder //
         .newBuilder() //
@@ -119,8 +117,8 @@ public class MongoSchemaFactory implements SchemaFactory {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    MongoSchema schema = new MongoSchema(schemaName);
-    SchemaPlus hPlus = parent.add(schemaName, schema);
+    MongoSchema schema = new MongoSchema(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
 
@@ -186,7 +184,7 @@ public class MongoSchemaFactory implements SchemaFactory {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(dbName, collectionName);
-      return new DynamicDrillTable(plugin, schemaName, null, mongoScanSpec);
+      return new DynamicDrillTable(plugin, getName(), null, mongoScanSpec);
     }
 
     @Override
diff --git a/contrib/storage-opentsdb/README.md b/contrib/storage-opentsdb/README.md
index 0c616b5..04ac1a6 100644
--- a/contrib/storage-opentsdb/README.md
+++ b/contrib/storage-opentsdb/README.md
@@ -1,4 +1,4 @@
-# drill-storage-openTSDB
+# drill--opentsdb-storage
 
 Implementation of TSDB storage plugin. Plugin uses REST API to work with TSDB. 
 
@@ -30,12 +30,12 @@ List of supported time
 
 Params must be specified in FROM clause of the query separated by commas. For example
 
-`openTSDB.(metric=metric_name, start=4d-ago, aggregator=sum)`
+`opentsdb.(metric=metric_name, start=4d-ago, aggregator=sum)`
 
 Supported queries for now are listed below:
 
 ```
-USE openTSDB
+USE opentsdb
 ```
 
 ```
@@ -44,26 +44,26 @@ SHOW tables
 Will print available metrics. Max number of the printed results is a Integer.MAX value
 
 ```
-SELECT * FROM openTSDB. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` 
+SELECT * FROM opentsdb. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` 
 ```
 Return aggregated elements from `warp.speed.test` table since 47y-ago 
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
 ```
 Return aggregated elements from `warp.speed.test` table
 
 ```
-SELECT `timestamp`, sum(`aggregated value`) FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
+SELECT `timestamp`, sum(`aggregated value`) FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
 ```
 Return aggregated and grouped value by standard drill functions from `warp.speed.test table`, but with the custom aggregator
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
 ```
 Return aggregated data limited by downsample
 
 ```
-SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
+SELECT * FROM opentsdb.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
 ```
 Return aggregated data limited by end time
\ No newline at end of file
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
index 2de763b..539442a 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
@@ -36,10 +36,9 @@ public class OpenTSDBStoragePlugin extends AbstractStoragePlugin {
 
   private final ServiceImpl db;
 
-  public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name)
-      throws IOException {
+  public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name) throws IOException {
     super(context, name);
-    this.schemaFactory = new OpenTSDBSchemaFactory(this, name);
+    this.schemaFactory = new OpenTSDBSchemaFactory(this, getName());
     this.engineConfig = configuration;
     this.db = new ServiceImpl(configuration.getConnection());
   }
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
index 3b86a95..f7ae4f3 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/schema/OpenTSDBSchemaFactory.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.store.openTSDB.schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.openTSDB.DrillOpenTSDBTable;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBScanSpec;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePlugin;
@@ -34,41 +34,40 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 
-public class OpenTSDBSchemaFactory implements SchemaFactory {
+public class OpenTSDBSchemaFactory extends AbstractSchemaFactory {
 
-  private static final Logger log = LoggerFactory.getLogger(OpenTSDBSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(OpenTSDBSchemaFactory.class);
 
-  private final String schemaName;
   private final OpenTSDBStoragePlugin plugin;
 
   public OpenTSDBSchemaFactory(OpenTSDBStoragePlugin plugin, String schemaName) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    OpenTSDBSchema schema = new OpenTSDBSchema(schemaName);
-    parent.add(schemaName, schema);
+    OpenTSDBSchema schema = new OpenTSDBSchema(getName());
+    parent.add(getName(), schema);
   }
 
   class OpenTSDBSchema extends AbstractSchema {
 
     OpenTSDBSchema(String name) {
-      super(Collections.<String>emptyList(), name);
+      super(Collections.emptyList(), name);
     }
 
     @Override
     public Table getTable(String name) {
       OpenTSDBScanSpec scanSpec = new OpenTSDBScanSpec(name);
       try {
-        return new DrillOpenTSDBTable(schemaName, plugin, new Schema(plugin.getClient(), name), scanSpec);
+        return new DrillOpenTSDBTable(getName(), plugin, new Schema(plugin.getClient(), name), scanSpec);
       } catch (Exception e) {
         // Calcite firstly looks for a table in the default schema, if the table was not found,
         // it looks in the root schema.
         // If the table does not exist, a query will fail at validation stage,
         // so the error should not be thrown here.
-        logger.warn("Failure while loading table '{}' for database '{}'.", name, schemaName, e.getCause());
+        logger.warn("Failure while loading table '{}' for database '{}'.", name, getName(), e.getCause());
         return null;
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 0839898..97dd2d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -68,7 +68,7 @@ public class ZookeeperClient implements AutoCloseable {
    */
   public void start() throws Exception {
     curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
-    getCache().start();
+    getCache().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); //build cache at start up, to ensure we get correct results right away
   }
 
   public PathChildrenCache getCache() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 1318f72..9b1a34a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.expr.fn.registry;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -35,6 +37,7 @@ import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.exception.FunctionValidationException;
 import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.apache.drill.exec.expr.fn.FunctionConverter;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
@@ -112,7 +115,7 @@ public class LocalFunctionRegistry {
   public List<String> validate(String jarName, ScanResult scanResult) {
     List<String> functions = Lists.newArrayList();
     FunctionConverter converter = new FunctionConverter();
-    List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses();
+    List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses(FunctionTemplate.class.getName());
 
     if (registryHolder.containsJar(jarName)) {
       throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
@@ -158,11 +161,11 @@ public class LocalFunctionRegistry {
    * @param version remote function registry version number with which local function registry is synced
    */
   public void register(List<JarScan> jars, long version) {
-    Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
+    Map<String, List<FunctionHolder>> newJars = new HashMap<>();
     for (JarScan jarScan : jars) {
       FunctionConverter converter = new FunctionConverter();
-      List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses();
-      List<FunctionHolder> functions = Lists.newArrayList();
+      List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses(FunctionTemplate.class.getName());
+      List<FunctionHolder> functions = new ArrayList<>();
       newJars.put(jarScan.getJarName(), functions);
       for (AnnotatedClassDescriptor func : providerClasses) {
         DrillFuncHolder holder = converter.getHolder(func, jarScan.getClassLoader());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
index 1493a92..ad0c2cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 
@@ -38,11 +39,13 @@ import com.google.common.io.Resources;
 
 public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>> {
 
-  private Map<String, StoragePluginConfig> storage;
+  private final Map<String, StoragePluginConfig> storage;
 
   @JsonCreator
   public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage) {
-    this.storage = storage;
+    Map<String, StoragePluginConfig> caseInsensitiveStorage = CaseInsensitiveMap.newHashMap();
+    Optional.ofNullable(storage).ifPresent(caseInsensitiveStorage::putAll);
+    this.storage = caseInsensitiveStorage;
   }
 
   public static void main(String[] args) throws Exception{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index a262363..b47ab32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -104,7 +104,8 @@ public class SchemaUtilites {
   /** Utility method to search for schema path starting from the given <i>schema</i> reference */
   private static SchemaPlus searchSchemaTree(SchemaPlus schema, final List<String> schemaPath) {
     for (String schemaName : schemaPath) {
-      schema = schema.getSubSchema(schemaName);
+      // schemas in Drill are case insensitive and stored in lower case
+      schema = schema.getSubSchema(schemaName.toLowerCase());
       if (schema == null) {
         return null;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
index bb51ef0..3f11fd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
@@ -80,9 +80,9 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
         .build(logger);
     }
 
+    AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
     StoragePlugin storagePlugin;
     try {
-      AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
       storagePlugin = context.getStorage().getPlugin(drillSchema.getSchemaPath().get(0));
       if (storagePlugin == null) {
         throw new DrillRuntimeException(String.format("Unable to find storage plugin with the following name [%s].",
@@ -95,10 +95,10 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
     try {
       Map configMap = mapper.convertValue(storagePlugin.getConfig(), Map.class);
       if (storagePlugin instanceof FileSystemPlugin) {
-        transformWorkspaces(schema.names, configMap);
+        transformWorkspaces(drillSchema.getSchemaPath(), configMap);
       }
       String properties = mapper.writeValueAsString(configMap);
-      return DirectPlan.createDirectPlan(context, new DescribeSchemaResult(Joiner.on(".").join(schema.names), properties));
+      return DirectPlan.createDirectPlan(context, new DescribeSchemaResult(drillSchema.getFullSchemaName(), properties));
     } catch (JsonProcessingException e) {
       throw new DrillRuntimeException("Error while trying to convert storage config to json string", e);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index 32768f8..70dd5a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -24,10 +24,12 @@ import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -37,6 +39,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.exceptions.UserException;
@@ -44,11 +47,10 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.SqlConverter;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
+import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.google.common.collect.ImmutableList;
-
 public class DescribeTableHandler extends DefaultSqlHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DescribeTableHandler.class);
 
@@ -60,23 +62,19 @@ public class DescribeTableHandler extends DefaultSqlHandler {
     DrillSqlDescribeTable node = unwrap(sqlNode, DrillSqlDescribeTable.class);
 
     try {
-      List<SqlNode> selectList =
-          ImmutableList.of(new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                           new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
-                           new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
+      List<SqlNode> selectList = Arrays.asList(
+          new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+          new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
+          new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
 
-      SqlNode fromClause = new SqlIdentifier(
-          ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), null, SqlParserPos.ZERO, null);
+      SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), SqlParserPos.ZERO);
 
-      final SqlIdentifier table = node.getTable();
-      final SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
-      final List<String> schemaPathGivenInCmd = Util.skipLast(table.names);
-      final SchemaPlus schema = SchemaUtilites.findSchema(defaultSchema, schemaPathGivenInCmd);
-      final String charset = Util.getDefaultCharset().name();
+      SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
+      List<String> schemaPathGivenInCmd = Util.skipLast(node.getTable().names);
+      SchemaPlus schema = SchemaUtilites.findSchema(defaultSchema, schemaPathGivenInCmd);
 
       if (schema == null) {
-        SchemaUtilites.throwSchemaNotFoundException(defaultSchema,
-            SchemaUtilites.SCHEMA_PATH_JOINER.join(schemaPathGivenInCmd));
+        SchemaUtilites.throwSchemaNotFoundException(defaultSchema, SchemaUtilites.getSchemaPath(schemaPathGivenInCmd));
       }
 
       if (SchemaUtilites.isRootSchema(schema)) {
@@ -85,10 +83,11 @@ public class DescribeTableHandler extends DefaultSqlHandler {
             .build(logger);
       }
 
-      final String tableName = Util.last(table.names);
-
       // find resolved schema path
-      final String schemaPath = SchemaUtilites.unwrapAsDrillSchemaInstance(schema).getFullSchemaName();
+      AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schema);
+      String schemaPath = drillSchema.getFullSchemaName();
+
+      String tableName = Util.last(node.getTable().names);
 
       if (schema.getTable(tableName) == null) {
         throw UserException.validationError()
@@ -101,14 +100,21 @@ public class DescribeTableHandler extends DefaultSqlHandler {
         schemaCondition = DrillParserUtil.createCondition(
             new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
             SqlStdOperatorTable.EQUALS,
-            SqlLiteral.createCharString(schemaPath, charset, SqlParserPos.ZERO)
+            SqlLiteral.createCharString(schemaPath, Util.getDefaultCharset().name(), SqlParserPos.ZERO)
         );
       }
 
-      SqlNode where = DrillParserUtil.createCondition(
-          new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO),
+      SqlNode tableNameColumn = new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO);
+
+      // if table names are case insensitive, wrap column values and condition in lower function
+      if (!drillSchema.areTableNamesCaseSensitive()) {
+        tableNameColumn = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, tableNameColumn);
+        tableName = tableName.toLowerCase();
+      }
+
+      SqlNode where = DrillParserUtil.createCondition(tableNameColumn,
           SqlStdOperatorTable.EQUALS,
-          SqlLiteral.createCharString(tableName, charset, SqlParserPos.ZERO));
+          SqlLiteral.createCharString(tableName, Util.getDefaultCharset().name(), SqlParserPos.ZERO));
 
       where = DrillParserUtil.createCondition(schemaCondition, SqlStdOperatorTable.AND, where);
 
@@ -116,14 +122,21 @@ public class DescribeTableHandler extends DefaultSqlHandler {
       if (node.getColumn() != null) {
         columnFilter =
             DrillParserUtil.createCondition(
-                new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+                SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO)),
                 SqlStdOperatorTable.EQUALS,
-                SqlLiteral.createCharString(node.getColumn().toString(), charset, SqlParserPos.ZERO));
+                SqlLiteral.createCharString(node.getColumn().toString().toLowerCase(), Util.getDefaultCharset().name(), SqlParserPos.ZERO));
       } else if (node.getColumnQualifier() != null) {
-        columnFilter =
-            DrillParserUtil.createCondition(
-                new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                SqlStdOperatorTable.LIKE, node.getColumnQualifier());
+        SqlNode columnQualifier = node.getColumnQualifier();
+        SqlNode column = new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO);
+        if (columnQualifier instanceof SqlCharStringLiteral) {
+          NlsString conditionString = ((SqlCharStringLiteral) columnQualifier).getNlsString();
+          columnQualifier = SqlCharStringLiteral.createCharString(
+              conditionString.getValue().toLowerCase(),
+              conditionString.getCharsetName(),
+              columnQualifier.getParserPosition());
+          column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+        }
+        columnFilter = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, columnQualifier);
       }
 
       where = DrillParserUtil.createCondition(where, SqlStdOperatorTable.AND, columnFilter);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
index ab460ad..2c07d2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
@@ -17,8 +17,12 @@
  */
 package org.apache.drill.exec.planner.sql.handlers;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.util.NlsString;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
@@ -33,8 +37,6 @@ import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
-import com.google.common.collect.ImmutableList;
-
 public class ShowSchemasHandler extends DefaultSqlHandler {
 
   public ShowSchemasHandler(SqlHandlerConfig config) { super(config); }
@@ -43,17 +45,24 @@ public class ShowSchemasHandler extends DefaultSqlHandler {
   @Override
   public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowSchemas node = unwrap(sqlNode, SqlShowSchemas.class);
-    List<SqlNode> selectList =
-        ImmutableList.of(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
+    List<SqlNode> selectList = Collections.singletonList(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
 
-    SqlNode fromClause = new SqlIdentifier(
-        ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), null, SqlParserPos.ZERO, null);
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), SqlParserPos.ZERO);
 
     SqlNode where = null;
-    final SqlNode likePattern = node.getLikePattern();
+    SqlNode likePattern = node.getLikePattern();
     if (likePattern != null) {
-      where = DrillParserUtil.createCondition(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO),
-                                              SqlStdOperatorTable.LIKE, likePattern);
+      SqlNode column = new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO);
+      // schema names are case insensitive, wrap column in lower function, pattern to lower case
+      if (likePattern instanceof SqlCharStringLiteral) {
+        NlsString conditionString = ((SqlCharStringLiteral) likePattern).getNlsString();
+        likePattern = SqlCharStringLiteral.createCharString(
+            conditionString.getValue().toLowerCase(),
+            conditionString.getCharsetName(),
+            likePattern.getParserPosition());
+        column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+      }
+      where = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, likePattern);
     } else if (node.getWhereClause() != null) {
       where = node.getWhereClause();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index e73e829..a910f9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -21,10 +21,12 @@ import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -34,6 +36,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.exceptions.UserException;
@@ -45,9 +48,6 @@ import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 public class ShowTablesHandler extends DefaultSqlHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ShowTablesHandler.class);
 
@@ -57,48 +57,56 @@ public class ShowTablesHandler extends DefaultSqlHandler {
   @Override
   public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowTables node = unwrap(sqlNode, SqlShowTables.class);
-    List<SqlNode> selectList = Lists.newArrayList();
-    SqlNode fromClause;
-    SqlNode where;
-
-    // create select columns
-    selectList.add(new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO));
-    selectList.add(new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
+    List<SqlNode> selectList = Arrays.asList(
+        new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
+        new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
 
-    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
 
-    final SqlIdentifier db = node.getDb();
-    String tableSchema;
-    if (db != null) {
-      tableSchema = db.toString();
-    } else {
-      // If no schema is given in SHOW TABLES command, list tables from current schema
-      SchemaPlus schema = config.getConverter().getDefaultSchema();
+    SchemaPlus schemaPlus;
+    if (node.getDb() != null) {
+      List<String> schemaNames = node.getDb().names;
+      schemaPlus = SchemaUtilites.findSchema(config.getConverter().getDefaultSchema(), schemaNames);
 
-      if (SchemaUtilites.isRootSchema(schema)) {
-        // If the default schema is a root schema, throw an error to select a default schema
+      if (schemaPlus == null) {
         throw UserException.validationError()
-            .message("No default schema selected. Select a schema using 'USE schema' command")
+            .message(String.format("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames)))
             .build(logger);
       }
 
-      final AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schema);
-      tableSchema = drillSchema.getFullSchemaName();
+    } else {
+      // If no schema is given in SHOW TABLES command, list tables from current schema
+      schemaPlus = config.getConverter().getDefaultSchema();
+    }
+
+    if (SchemaUtilites.isRootSchema(schemaPlus)) {
+      // If the default schema is a root schema, throw an error to select a default schema
+      throw UserException.validationError()
+          .message("No default schema selected. Select a schema using 'USE schema' command")
+          .build(logger);
     }
 
-    final String charset = Util.getDefaultCharset().name();
-    where = DrillParserUtil.createCondition(
+    AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schemaPlus);
+
+    SqlNode where = DrillParserUtil.createCondition(
         new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
         SqlStdOperatorTable.EQUALS,
-        SqlLiteral.createCharString(tableSchema, charset, SqlParserPos.ZERO));
+        SqlLiteral.createCharString(drillSchema.getFullSchemaName(), Util.getDefaultCharset().name(), SqlParserPos.ZERO));
 
     SqlNode filter = null;
-    final SqlNode likePattern = node.getLikePattern();
-    if (likePattern != null) {
-      filter = DrillParserUtil.createCondition(
-          new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO),
-          SqlStdOperatorTable.LIKE,
-          likePattern);
+    if (node.getLikePattern() != null) {
+      SqlNode likePattern = node.getLikePattern();
+      SqlNode column = new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO);
+      // wrap columns name values and condition in lower function if case insensitive
+      if (!drillSchema.areTableNamesCaseSensitive() && likePattern instanceof SqlCharStringLiteral) {
+        NlsString conditionString = ((SqlCharStringLiteral) likePattern).getNlsString();
+        likePattern = SqlCharStringLiteral.createCharString(
+            conditionString.getValue().toLowerCase(),
+            conditionString.getCharsetName(),
+            likePattern.getParserPosition());
+        column = SqlStdOperatorTable.LOWER.createCall(SqlParserPos.ZERO, column);
+      }
+      filter = DrillParserUtil.createCondition(column, SqlStdOperatorTable.LIKE, likePattern);
     } else if (node.getWhereClause() != null) {
       filter = node.getWhereClause();
     }
@@ -119,4 +127,4 @@ public class ShowTablesHandler extends DefaultSqlHandler {
     converter.useRootSchemaAsDefault(false);
     return sqlNodeRelDataTypePair;
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index fb9bc6b..2b70c3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,7 +47,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
   private static final Expression EXPRESSION = new DefaultExpression(Object.class);
 
   public AbstractSchema(List<String> parentSchemaPath, String name) {
-    schemaPath = Lists.newArrayList();
+    name = name == null ? null : name.toLowerCase();
+    schemaPath = new ArrayList<>();
     schemaPath.addAll(parentSchemaPath);
     schemaPath.add(name);
     this.name = name;
@@ -96,7 +98,7 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
    * Create a new view given definition.
    * @param view View info including name, definition etc.
    * @return Returns true if an existing view is replaced with the given view. False otherwise.
-   * @throws IOException
+   * @throws IOException in case of error creating a view
    */
   public boolean createView(View view) throws IOException {
     throw UserException.unsupportedError()
@@ -107,8 +109,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
   /**
    * Drop the view with given name.
    *
-   * @param viewName
-   * @throws IOException
+   * @param viewName view name
+   * @throws IOException in case of error dropping the view
    */
   public void dropView(String viewName) throws IOException {
     throw UserException.unsupportedError()
@@ -217,7 +219,7 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
    * plugin supports).
    * It is not guaranteed that the retrieved tables would have RowType and Statistic being fully populated.
    *
-   * Specifically, calling {@link Table#getRowType(RelDataTypeFactory)} or {@link Table#getStatistic()} might incur
+   * Specifically, calling {@link Table#getRowType(org.apache.calcite.rel.type.RelDataTypeFactory)} or {@link Table#getStatistic()} might incur
    * {@link UnsupportedOperationException} being thrown.
    *
    * @param  tableNames the requested tables, specified by the table names
@@ -263,4 +265,15 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
     return tableNamesAndTypes;
   }
 
+  /**
+   * Indicates if table names in schema are case sensitive. By default they are.
+   * If schema implementation claims its table names are case insensitive,
+   * it is responsible for making case insensitive look up by table name.
+   *
+   * @return true if table names are case sensitive
+   */
+  public boolean areTableNamesCaseSensitive() {
+    return true;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
similarity index 64%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
index adf15a2..da5af1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchemaFactory.java
@@ -15,23 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.ischema;
+package org.apache.drill.exec.store;
 
-import org.apache.drill.common.logical.StoragePluginConfig;
-
-public class InfoSchemaConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaConfig.class);
+/**
+ * Abstract implementation of {@link SchemaFactory}, ensures that given schema name is always converted is lower case.
+ */
+public abstract class AbstractSchemaFactory implements SchemaFactory {
 
-  public static final String NAME = "ischema";
+  private final String name;
 
-  @Override
-  public int hashCode(){
-    return 1;
+  protected AbstractSchemaFactory(String name) {
+    this.name = name == null ? null : name.toLowerCase();
   }
 
-  @Override
-  public boolean equals(Object o){
-    return o instanceof InfoSchemaConfig;
+  public String getName() {
+    return name;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 9818ff3..b90fd63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -34,7 +34,8 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 
-/** Abstract class for StorePlugin implementations.
+/**
+ * Abstract class for StorePlugin implementations.
  * See StoragePlugin for description of the interface intent and its methods.
  */
 public abstract class AbstractStoragePlugin implements StoragePlugin {
@@ -44,7 +45,7 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
 
   protected AbstractStoragePlugin(DrillbitContext inContext, String inName) {
     this.context = inContext;
-    this.name = inName;
+    this.name = inName == null ? null : inName.toLowerCase();
   }
 
   @Override
@@ -137,12 +138,13 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     throw new UnsupportedOperationException(String.format("%s doesn't support format plugins", getClass().getName()));
   }
 
-  public DrillbitContext getContext() {
-    return context;
-  }
-
+  @Override
   public String getName() {
     return name;
   }
 
+  public DrillbitContext getContext() {
+    return context;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
index c87d8f6..a63d015 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PartitionExplorerImpl.java
@@ -36,7 +36,7 @@ public class PartitionExplorerImpl implements PartitionExplorer {
                                            List<String> partitionValues
                                            ) throws PartitionNotFoundException {
 
-    AbstractSchema subSchema = rootSchema.getSubSchema(schema).unwrap(AbstractSchema.class);
+    AbstractSchema subSchema = rootSchema.getSubSchema(schema.toLowerCase()).unwrap(AbstractSchema.class);
     return subSchema.getSubPartitions(table, partitionColumns, partitionValues);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index 2027527..4545169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -25,14 +25,13 @@ import java.io.IOException;
  * StoragePlugins implements this interface to register the schemas they provide.
  */
 public interface SchemaFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
   /**
    * Register the schemas provided by this SchemaFactory implementation under the given parent schema.
    *
    * @param schemaConfig Configuration for schema objects.
    * @param parent Reference to parent schema.
-   * @throws IOException
+   * @throws IOException in case of error during schema registration
    */
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
+  void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 7bd7eaf..4e6a7c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -112,4 +112,6 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * @throws UnsupportedOperationException, if storage plugin doesn't support format plugins.
    */
   FormatPlugin getFormatPlugin(FormatPluginConfig config);
+
+  String getName();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index 582791e..62fd031 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -22,8 +22,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -31,17 +29,19 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 
 /**
  * Holds maps to storage plugins. Supports name => plugin and config => plugin mappings.
  *
  * This is inspired by ConcurrentMap but provides a secondary key mapping that allows an alternative lookup mechanism.
  * The class is responsible for internally managing consistency between the two maps. This class is threadsafe.
+ * Name map is case insensitive.
  */
 class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class);
 
-  private final ConcurrentMap<String, StoragePlugin> nameMap = new ConcurrentHashMap<>();
+  private final Map<String, StoragePlugin> nameMap = CaseInsensitiveMap.newConcurrentMap();
 
   @SuppressWarnings("unchecked")
   private final Multimap<StoragePluginConfig, StoragePlugin> configMap =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 313d3b9..036187a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -27,8 +27,7 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.sys.PersistentStore;
 
 public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
-  String SYS_PLUGIN = "sys";
-  String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
+
   String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
   String ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE = "drill.exec.storage.action_on_plugins_override_file";
   String PSTORE_NAME = "sys.storage_plugins";
@@ -92,8 +91,7 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
    * @return A FormatPlugin instance
    * @throws ExecutionSetupException if plugin cannot be obtained
    */
-  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
-      throws ExecutionSetupException;
+  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException;
 
   /**
    * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 03ce5a9..bd5a93d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -43,19 +43,18 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
-import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.CaseInsensitivePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
-import org.apache.drill.exec.store.sys.SystemTablePlugin;
-import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
@@ -67,35 +66,28 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.io.Resources;
 
 public class StoragePluginRegistryImpl implements StoragePluginRegistry {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
 
-  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
-  private final StoragePluginMap enabledPlugins = new StoragePluginMap();
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
 
-  private DrillbitContext context;
-  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
-  private final PersistentStore<StoragePluginConfig> pluginSystemTable;
+  private final StoragePluginMap enabledPlugins;
+  private final DrillSchemaFactory schemaFactory;
+  private final DrillbitContext context;
   private final LogicalPlanPersistence lpPersistence;
   private final ScanResult classpathScan;
+  private final PersistentStore<StoragePluginConfig> pluginSystemTable;
   private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
 
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
+  private Map<String, StoragePlugin> systemPlugins = Collections.emptyMap();
+
   public StoragePluginRegistryImpl(DrillbitContext context) {
+    this.enabledPlugins = new StoragePluginMap();
+    this.schemaFactory = new DrillSchemaFactory(null);
     this.context = checkNotNull(context);
     this.lpPersistence = checkNotNull(context.getLpPersistence());
     this.classpathScan = checkNotNull(context.getClasspathScan());
-    try {
-      this.pluginSystemTable = context
-          .getStoreProvider()
-          .getOrCreateStore(PersistentStoreConfig
-              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
-              .name(PSTORE_NAME)
-              .build());
-    } catch (StoreException | RuntimeException e) {
-      logger.error("Failure while loading storage plugin registry.", e);
-      throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
-    }
-
-    ephemeralPlugins = CacheBuilder.newBuilder()
+    this.pluginSystemTable = initPluginsSystemTable(context, lpPersistence);
+    this.ephemeralPlugins = CacheBuilder.newBuilder()
         .expireAfterAccess(24, TimeUnit.HOURS)
         .maximumSize(250)
         .removalListener(
@@ -109,15 +101,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   }
 
   @Override
-  public PersistentStore<StoragePluginConfig> getStore() {
-    return pluginSystemTable;
-  }
-
-  @Override
   public void init() {
     availablePlugins = findAvailablePlugins(classpathScan);
+    systemPlugins = initSystemPlugins(classpathScan, context);
     try {
-      StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins();
+      StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins(lpPersistence);
 
       StoragePluginsHandler storagePluginsHandler = new StoragePluginsHandlerService(context);
       storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins);
@@ -129,115 +117,18 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     }
   }
 
-  /**
-   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
-   * instantiating of Drill
-   *
-   * @return bootstrap storage plugins
-   * @throws IOException if a read error occurs
-   */
-  private StoragePlugins loadBootstrapPlugins() throws IOException {
-    // bootstrap load the config since no plugins are stored.
-    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
-    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
-    if (urls != null && !urls.isEmpty()) {
-      logger.info("Loading the storage plugin configs from URLs {}.", urls);
-      StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
-      Map<String, URL> pluginURLMap = new HashMap<>();
-      for (URL url : urls) {
-        String pluginsData = Resources.toString(url, Charsets.UTF_8);
-        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
-        for (Entry<String, StoragePluginConfig> plugin : plugins) {
-          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
-          if (oldPluginConfig != null) {
-            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
-                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
-          } else {
-            pluginURLMap.put(plugin.getKey(), url);
-          }
-        }
-      }
-      return bootstrapPlugins;
-    } else {
-      throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
-    }
-  }
-
-  /**
-   * It initializes {@link #enabledPlugins} with currently enabled plugins
-   */
-  private void defineEnabledPlugins() {
-    Map<String, StoragePlugin> activePlugins = new HashMap<>();
-    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
-    while (allPlugins.hasNext()) {
-      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
-      String name = plugin.getKey();
-      StoragePluginConfig config = plugin.getValue();
-      if (config.isEnabled()) {
-        try {
-          StoragePlugin storagePlugin = create(name, config);
-          activePlugins.put(name, storagePlugin);
-        } catch (ExecutionSetupException e) {
-          logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
-          config.setEnabled(false);
-          pluginSystemTable.put(name, config);
-        }
-      }
-    }
-
-    activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
-        INFORMATION_SCHEMA_PLUGIN));
-    activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
-
-    enabledPlugins.putAll(activePlugins);
-  }
-
-  /**
-   * Add a plugin and configuration. Assumes neither exists. Primarily
-   * for testing.
-   *
-   * @param name plugin name
-   * @param config plugin config
-   * @param plugin plugin implementation
-   */
-  @VisibleForTesting
-  public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
-    addEnabledPlugin(name, plugin);
-    pluginSystemTable.putIfAbsent(name, config);
-  }
-
-  @Override
-  public void addEnabledPlugin(String name, StoragePlugin plugin) {
-    enabledPlugins.put(name, plugin);
-  }
-
   @Override
   public void deletePlugin(String name) {
-    @SuppressWarnings("resource")
     StoragePlugin plugin = enabledPlugins.remove(name);
     closePlugin(plugin);
     pluginSystemTable.delete(name);
   }
 
-  private void closePlugin(StoragePlugin plugin) {
-    if (plugin == null) {
-      return;
-    }
-
-    try {
-      plugin.close();
-    } catch (Exception e) {
-      logger.warn("Exception while shutting down storage plugin.");
-    }
-  }
-
-  @SuppressWarnings("resource")
   @Override
-  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
-      throws ExecutionSetupException {
+  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException {
     for (;;) {
-      final StoragePlugin oldPlugin = enabledPlugins.get(name);
-      final StoragePlugin newPlugin = create(name, config);
+      StoragePlugin oldPlugin = enabledPlugins.get(name);
+      StoragePlugin newPlugin = create(name, config);
       boolean done = false;
       try {
         if (oldPlugin != null) {
@@ -273,12 +164,12 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   @Override
   public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
     StoragePlugin plugin = enabledPlugins.get(name);
-    if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
+    if (systemPlugins.get(name) != null) {
       return plugin;
     }
 
     // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
-    StoragePluginConfig config = this.pluginSystemTable.get(name);
+    StoragePluginConfig config = pluginSystemTable.get(name);
     if (config == null) {
       if (plugin != null) {
         enabledPlugins.remove(name);
@@ -294,7 +185,6 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     }
   }
 
-
   @Override
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if (config instanceof NamedStoragePluginConfig) {
@@ -322,14 +212,246 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     }
   }
 
-  @SuppressWarnings("resource")
   @Override
-  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
-      throws ExecutionSetupException {
+  public void addEnabledPlugin(String name, StoragePlugin plugin) {
+    enabledPlugins.put(name, plugin);
+  }
+
+  @Override
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
     StoragePlugin storagePlugin = getPlugin(storageConfig);
     return storagePlugin.getFormatPlugin(formatConfig);
   }
 
+  @Override
+  public PersistentStore<StoragePluginConfig> getStore() {
+    return pluginSystemTable;
+  }
+
+  @Override
+  public SchemaFactory getSchemaFactory() {
+    return schemaFactory;
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return enabledPlugins.iterator();
+  }
+
+  @Override
+  public synchronized void close() throws Exception {
+    ephemeralPlugins.invalidateAll();
+    enabledPlugins.close();
+    pluginSystemTable.close();
+  }
+
+  /**
+   * Add a plugin and configuration. Assumes neither exists. Primarily for testing.
+   *
+   * @param config plugin config
+   * @param plugin plugin implementation
+   */
+  @VisibleForTesting
+  public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
+    addEnabledPlugin(name, plugin);
+    pluginSystemTable.putIfAbsent(name, config);
+  }
+
+  /**
+   * <ol>
+   *   <li>Initializes persistent store for storage plugins.</li>
+   *   <li>Since storage plugins names are case-insensitive in Drill, to ensure backward compatibility,
+   *   re-writes those not stored in lower case with lower case names, for duplicates issues warning. </li>
+   *   <li>Wraps plugin system table into case insensitive wrapper.</li>
+   * </ol>
+   *
+   * @param context drillbit context
+   * @param lpPersistence deserialization mapper provider
+   * @return persistent store for storage plugins
+   */
+  private PersistentStore<StoragePluginConfig> initPluginsSystemTable(DrillbitContext context, LogicalPlanPersistence lpPersistence) {
+
+    try {
+      PersistentStore<StoragePluginConfig> pluginSystemTable = context
+          .getStoreProvider()
+          .getOrCreateStore(PersistentStoreConfig
+              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
+              .name(PSTORE_NAME)
+              .build());
+
+      Iterator<Entry<String, StoragePluginConfig>> storedPlugins = pluginSystemTable.getAll();
+      while (storedPlugins.hasNext()) {
+        Entry<String, StoragePluginConfig> entry = storedPlugins.next();
+        String pluginName = entry.getKey();
+        if (!pluginName.equals(pluginName.toLowerCase())) {
+          logger.debug("Replacing plugin name {} with its lower case equivalent.", pluginName);
+          pluginSystemTable.delete(pluginName);
+          if (!pluginSystemTable.putIfAbsent(pluginName.toLowerCase(), entry.getValue())) {
+            logger.warn("Duplicated storage plugin name [{}] is found. Duplicate is deleted from persistent storage.", pluginName);
+          }
+        }
+      }
+
+      return new CaseInsensitivePersistentStore<>(pluginSystemTable);
+    } catch (StoreException e) {
+      logger.error("Failure while loading storage plugin registry.", e);
+      throw new DrillRuntimeException("Failure while reading and loading storage plugin configuration.", e);
+    }
+  }
+
+  /**
+   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
+   * instantiating of Drill
+   *
+   * @param lpPersistence deserialization mapper provider
+   * @return bootstrap storage plugins
+   * @throws IOException if a read error occurs
+   */
+  private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
+    // bootstrap load the config since no plugins are stored.
+    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
+    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+    if (urls != null && !urls.isEmpty()) {
+      logger.info("Loading the storage plugin configs from URLs {}.", urls);
+      StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
+      Map<String, URL> pluginURLMap = new HashMap<>();
+      for (URL url : urls) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+        for (Entry<String, StoragePluginConfig> plugin : plugins) {
+          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
+          if (oldPluginConfig != null) {
+            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
+                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
+          } else {
+            pluginURLMap.put(plugin.getKey(), url);
+          }
+        }
+      }
+      return bootstrapPlugins;
+    } else {
+      throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
+    }
+  }
+
+  /**
+   * Dynamically loads system plugins annotated with {@link SystemPlugin}.
+   * Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
+   *
+   * @param classpathScan classpath scan result
+   * @param context drillbit context
+   * @return map with system plugins stored by name
+   */
+  private Map<String, StoragePlugin> initSystemPlugins(ScanResult classpathScan, DrillbitContext context) {
+    Map<String, StoragePlugin> plugins = CaseInsensitiveMap.newHashMap();
+    List<AnnotatedClassDescriptor> annotatedClasses = classpathScan.getAnnotatedClasses(SystemPlugin.class.getName());
+    logger.trace("Found {} annotated classes with SystemPlugin annotation: {}.", annotatedClasses.size(), annotatedClasses);
+
+    for (AnnotatedClassDescriptor annotatedClass : annotatedClasses) {
+      try {
+        Class<?> aClass = Class.forName(annotatedClass.getClassName());
+        boolean isPluginInitialized = false;
+
+        for (Constructor<?> constructor : aClass.getConstructors()) {
+          Class<?>[] parameterTypes = constructor.getParameterTypes();
+
+          if (parameterTypes.length != 1 || parameterTypes[0] != DrillbitContext.class) {
+            logger.trace("Not matching constructor for {}. Expecting constructor with one parameter for DrillbitContext class.",
+                annotatedClass.getClassName());
+            continue;
+          }
+
+          Object instance = constructor.newInstance(context);
+          if (!(instance instanceof StoragePlugin)) {
+            logger.debug("Created instance of {} does not implement StoragePlugin interface.", annotatedClass.getClassName());
+            continue;
+          }
+
+          StoragePlugin storagePlugin = (StoragePlugin) instance;
+          String name = storagePlugin.getName();
+          if (name == null) {
+            logger.debug("Storage plugin name {} is not defined. Skipping plugin initialization.", annotatedClass.getClassName());
+            continue;
+          }
+          plugins.put(name, storagePlugin);
+          isPluginInitialized = true;
+
+        }
+        if (!isPluginInitialized) {
+          logger.debug("Skipping plugin registration, did not find matching constructor or initialized object of wrong type.");
+        }
+      } catch (ReflectiveOperationException e) {
+        logger.warn("Error during system plugin {} initialization. Plugin initialization will be skipped.", annotatedClass.getClassName(), e);
+      }
+    }
+    logger.trace("The following system plugins have been initialized: {}.", plugins.keySet());
+    return plugins;
+  }
+
+  /**
+   * Get a list of all available storage plugin class constructors.
+   * @param classpathScan A classpath scan to use.
+   * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
+   */
+  @SuppressWarnings("unchecked")
+  private Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
+    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
+    final Collection<Class<? extends StoragePlugin>> pluginClasses =
+        classpathScan.getImplementations(StoragePlugin.class);
+    final String lineBrokenList =
+        pluginClasses.size() == 0
+            ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
+    logger.debug("Found {} storage plugin configuration classes: {}.",
+        pluginClasses.size(), lineBrokenList);
+    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
+      int i = 0;
+      for (Constructor<?> c : plugin.getConstructors()) {
+        Class<?>[] params = c.getParameterTypes();
+        if (params.length != 3
+            || params[1] != DrillbitContext.class
+            || !StoragePluginConfig.class.isAssignableFrom(params[0])
+            || params[2] != String.class) {
+          logger.debug("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
+              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+          continue;
+        }
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if (i == 0) {
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
+            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+      }
+    }
+    return availablePlugins;
+  }
+
+  /**
+   * It initializes {@link #enabledPlugins} with currently enabled plugins
+   */
+  private void defineEnabledPlugins() {
+    Map<String, StoragePlugin> activePlugins = new HashMap<>();
+    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
+    while (allPlugins.hasNext()) {
+      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
+      String name = plugin.getKey();
+      StoragePluginConfig config = plugin.getValue();
+      if (config.isEnabled()) {
+        try {
+          StoragePlugin storagePlugin = create(name, config);
+          activePlugins.put(name, storagePlugin);
+        } catch (ExecutionSetupException e) {
+          logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
+          config.setEnabled(false);
+          pluginSystemTable.put(name, config);
+        }
+      }
+    }
+
+    activePlugins.putAll(systemPlugins);
+    enabledPlugins.putAll(activePlugins);
+  }
+
   private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
     // TODO: DRILL-6412: clients for storage plugins shouldn't be created, if storage plugin is disabled
     // Creating of the StoragePlugin leads to instantiating storage clients
@@ -343,30 +465,33 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       plugin = c.newInstance(pluginConfig, context, name);
       plugin.start();
       return plugin;
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
-        | IOException e) {
+    } catch (ReflectiveOperationException | IOException e) {
       Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
       if (t instanceof ExecutionSetupException) {
         throw ((ExecutionSetupException) t);
       }
-      throw new ExecutionSetupException(String.format(
-          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+      throw new ExecutionSetupException(String.format("Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
     }
   }
 
-  @Override
-  public Iterator<Entry<String, StoragePlugin>> iterator() {
-    return enabledPlugins.iterator();
-  }
+  private void closePlugin(StoragePlugin plugin) {
+    if (plugin == null) {
+      return;
+    }
 
-  @Override
-  public SchemaFactory getSchemaFactory() {
-    return schemaFactory;
+    try {
+      plugin.close();
+    } catch (Exception e) {
+      logger.warn("Exception while shutting down storage plugin.");
+    }
   }
 
-  public class DrillSchemaFactory implements SchemaFactory {
+  public class DrillSchemaFactory extends AbstractSchemaFactory {
+
+    public DrillSchemaFactory(String name) {
+      super(name);
+    }
 
-    @SuppressWarnings("resource")
     @Override
     public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
       Stopwatch watch = Stopwatch.createStarted();
@@ -385,7 +510,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
         }
         // remove those which are no longer in the registry
         for (String pluginName : currentPluginNames) {
-          if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
+          if (systemPlugins.get(pluginName) != null) {
             continue;
           }
           enabledPlugins.remove(pluginName);
@@ -448,49 +573,4 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
   }
 
-  @Override
-  public synchronized void close() throws Exception {
-    ephemeralPlugins.invalidateAll();
-    enabledPlugins.close();
-    pluginSystemTable.close();
-  }
-
-  /**
-   * Get a list of all available storage plugin class constructors.
-   * @param classpathScan A classpath scan to use.
-   * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
-   */
-  @SuppressWarnings("unchecked")
-  public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
-    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
-    final Collection<Class<? extends StoragePlugin>> pluginClasses =
-        classpathScan.getImplementations(StoragePlugin.class);
-    final String lineBrokenList =
-        pluginClasses.size() == 0
-            ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
-    logger.debug("Found {} storage plugin configuration classes: {}.",
-        pluginClasses.size(), lineBrokenList);
-    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
-      int i = 0;
-      for (Constructor<?> c : plugin.getConstructors()) {
-        Class<?>[] params = c.getParameterTypes();
-        if (params.length != 3
-            || params[1] != DrillbitContext.class
-            || !StoragePluginConfig.class.isAssignableFrom(params[0])
-            || params[2] != String.class) {
-          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
-              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
-          continue;
-        }
-        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
-        i++;
-      }
-      if (i == 0) {
-        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
-            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
-      }
-    }
-    return availablePlugins;
-  }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java
similarity index 63%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java
index adf15a2..85236af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SystemPlugin.java
@@ -15,23 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.ischema;
+package org.apache.drill.exec.store;
 
-import org.apache.drill.common.logical.StoragePluginConfig;
-
-public class InfoSchemaConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaConfig.class);
-
-  public static final String NAME = "ischema";
-
-  @Override
-  public int hashCode(){
-    return 1;
-  }
-
-  @Override
-  public boolean equals(Object o){
-    return o instanceof InfoSchemaConfig;
-  }
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
+/**
+ * Is used to indicated system plugins which will be dynamically initialized during storage plugin registry init stage.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface SystemPlugin {
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4eda955..be944a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.util.Map;
+import java.util.Optional;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -25,6 +26,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 
 @JsonTypeName(FileSystemConfig.NAME)
 public class FileSystemConfig extends StoragePluginConfig {
@@ -43,7 +45,9 @@ public class FileSystemConfig extends StoragePluginConfig {
                           @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
     this.connection = connection;
     this.config = config;
-    this.workspaces = workspaces;
+    Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
+    Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
+    this.workspaces = caseInsensitiveWorkspaces;
     this.formats = formats;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index cb66913..ed66366 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -57,6 +57,8 @@ import com.google.common.collect.ImmutableSet.Builder;
  */
 public class FileSystemPlugin extends AbstractStoragePlugin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+
   private final FileSystemSchemaFactory schemaFactory;
   private final FormatCreator formatCreator;
   private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 0d7dce4..1a97e60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,54 +29,50 @@ import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
-
 /**
  * This is the top level schema that responds to root level path requests. Also supports
  */
-public class FileSystemSchemaFactory implements SchemaFactory{
+public class FileSystemSchemaFactory extends AbstractSchemaFactory {
 
   public static final String DEFAULT_WS_NAME = "default";
 
   public static final String LOCAL_FS_SCHEME = "file";
 
   private List<WorkspaceSchemaFactory> factories;
-  private String schemaName;
   protected FileSystemPlugin plugin;
 
   public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
+    super(schemaName);
     // when the correspondent FileSystemPlugin is not passed in, we dig into ANY workspace factory to get it.
     if (factories.size() > 0) {
       this.plugin = factories.get(0).getPlugin();
     }
-    this.schemaName = schemaName;
     this.factories = factories;
   }
 
   public FileSystemSchemaFactory(FileSystemPlugin plugin, String schemaName, List<WorkspaceSchemaFactory> factories) {
+    super(schemaName);
     this.plugin = plugin;
-    this.schemaName = schemaName;
     this.factories = factories;
   }
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     @SuppressWarnings("resource")
-    FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig);
+    FileSystemSchema schema = new FileSystemSchema(getName(), schemaConfig);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
   }
@@ -82,10 +80,10 @@ public class FileSystemSchemaFactory implements SchemaFactory{
   public class FileSystemSchema extends AbstractSchema {
 
     private final WorkspaceSchema defaultSchema;
-    private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
+    private final Map<String, WorkspaceSchema> schemaMap = new HashMap<>();
 
     public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
-      super(ImmutableList.<String>of(), name);
+      super(Collections.emptyList(), name);
       final DrillFileSystem fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), plugin.getFsConf());
       for(WorkspaceSchemaFactory f :  factories){
         WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig, fs);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
index adf15a2..317e608 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
@@ -20,10 +20,11 @@ package org.apache.drill.exec.store.ischema;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 public class InfoSchemaConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaConfig.class);
 
   public static final String NAME = "ischema";
 
+  public static final InfoSchemaConfig INSTANCE = new InfoSchemaConfig();
+
   @Override
   public int hashCode(){
     return 1;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index 15bdcd9..63e19d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -29,7 +29,7 @@ public interface InfoSchemaConstants {
    String IS_CATALOG_CONNECT = "";
 
   /** Name of information schema. */
-   String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
+   String IS_SCHEMA_NAME = "information_schema";
 
   // CATALOGS column names:
    String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index fc1f01a..4005a40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -17,35 +17,41 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
+
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.SystemPlugin;
 
+@SystemPlugin
 public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaStoragePlugin.class);
 
   private final InfoSchemaConfig config;
 
+  @SuppressWarnings("unused") // used in StoragePluginRegistryImpl to dynamically init system plugins
+  public InfoSchemaStoragePlugin(DrillbitContext context) {
+    this(InfoSchemaConfig.INSTANCE, context, InfoSchemaConstants.IS_SCHEMA_NAME);
+  }
+
   public InfoSchemaStoragePlugin(InfoSchemaConfig config, DrillbitContext context, String name){
     super(context, name);
     this.config = config;
@@ -57,8 +63,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException {
+  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) {
     InfoSchemaTableType table = selection.getWith(getContext().getLpPersistence(),  InfoSchemaTableType.class);
     return new InfoSchemaGroupScan(table);
   }
@@ -69,23 +74,35 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    ISchema s = new ISchema(parent, this);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    ISchema s = new ISchema(this);
     parent.add(s.getName(), s);
   }
 
   /**
    * Representation of the INFORMATION_SCHEMA schema.
    */
-  private class ISchema extends AbstractSchema{
-    private Map<String, InfoSchemaDrillTable> tables;
-    public ISchema(SchemaPlus parent, InfoSchemaStoragePlugin plugin){
-      super(ImmutableList.<String>of(), IS_SCHEMA_NAME);
-      Map<String, InfoSchemaDrillTable> tbls = Maps.newHashMap();
-      for(InfoSchemaTableType tbl : InfoSchemaTableType.values()){
-        tbls.put(tbl.name(), new InfoSchemaDrillTable(plugin, IS_SCHEMA_NAME, tbl, config));
-      }
-      this.tables = ImmutableMap.copyOf(tbls);
+  private class ISchema extends AbstractSchema {
+
+    private final Map<String, InfoSchemaDrillTable> tables;
+    // for backward compatibility keep IS schema table names in upper case
+    // the way they used to appear in INFORMATION_SCHEMA.TABLES table
+    // though user can query them in any case
+    private final Set<String> originalTableNames;
+
+    ISchema(InfoSchemaStoragePlugin plugin) {
+
+      super(Collections.emptyList(), IS_SCHEMA_NAME);
+
+      this.tables = CaseInsensitiveMap.newHashMap();
+      this.originalTableNames = new HashSet<>();
+
+      Arrays.stream(InfoSchemaTableType.values()).forEach(
+          table -> {
+            tables.put(table.name(), new InfoSchemaDrillTable(plugin, getName(), table, config));
+            originalTableNames.add(table.name());
+          }
+      );
     }
 
     @Override
@@ -95,13 +112,18 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
 
     @Override
     public Set<String> getTableNames() {
-      return tables.keySet();
+      return originalTableNames;
     }
 
     @Override
     public String getTypeName() {
       return InfoSchemaConfig.NAME;
     }
+
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java
new file mode 100644
index 0000000..38bd529
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CaseInsensitivePersistentStore.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Wrapper around {@link PersistentStore} to ensure all passed keys are converted to lower case and stored this way.
+ * This will ensure case-insensitivity during insert, update, deletion or search.
+ */
+public class CaseInsensitivePersistentStore<V> implements PersistentStore<V> {
+
+  private final PersistentStore<V> underlyingStore;
+
+  public CaseInsensitivePersistentStore(PersistentStore<V> underlyingStore) {
+    this.underlyingStore = underlyingStore;
+  }
+
+  @Override
+  public boolean contains(String key) {
+    return underlyingStore.contains(key.toLowerCase());
+  }
+
+  @Override
+  public V get(String key) {
+    return underlyingStore.get(key.toLowerCase());
+  }
+
+  @Override
+  public void put(String key, V value) {
+    underlyingStore.put(key.toLowerCase(), value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getAll() {
+    return underlyingStore.getAll();
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return underlyingStore.getMode();
+  }
+
+  @Override
+  public void delete(String key) {
+    underlyingStore.delete(key.toLowerCase());
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    return underlyingStore.putIfAbsent(key.toLowerCase(), value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+    return underlyingStore.getRange(skip, take);
+  }
+
+  @Override
+  public void close() throws Exception {
+    underlyingStore.close();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 10e082f..d282017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -17,39 +17,47 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SystemPlugin;
 import org.apache.drill.exec.store.pojo.PojoDataType;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
 /**
  * A "storage" plugin for system tables.
  */
+@SystemPlugin
 public class SystemTablePlugin extends AbstractStoragePlugin {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
 
   public static final String SYS_SCHEMA_NAME = "sys";
 
   private final SystemTablePluginConfig config;
-  private final SystemSchema schema = new SystemSchema();
+  private final SystemSchema schema;
+
+  @SuppressWarnings("unused") // used in StoragePluginRegistryImpl to dynamically init system plugins
+  public SystemTablePlugin(DrillbitContext context) {
+    this(SystemTablePluginConfig.INSTANCE, context, SYS_SCHEMA_NAME);
+  }
 
   public SystemTablePlugin(SystemTablePluginConfig config, DrillbitContext context, String name) {
     super(context, name);
     this.config = config;
+    this.schema = new SystemSchema(this);
   }
 
   @Override
@@ -73,31 +81,29 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
    */
   private class SystemSchema extends AbstractSchema {
 
-    private final Set<String> tableNames;
+    private final Map<String, StaticDrillTable> tables;
+
+    SystemSchema(SystemTablePlugin plugin) {
 
-    public SystemSchema() {
-      super(ImmutableList.of(), SYS_SCHEMA_NAME);
-      Set<String> names = Sets.newHashSet();
-      for (SystemTable t : SystemTable.values()) {
-        names.add(t.getTableName());
-      }
-      this.tableNames = ImmutableSet.copyOf(names);
+      super(Collections.emptyList(), SYS_SCHEMA_NAME);
+
+      this.tables = Arrays.stream(SystemTable.values())
+          .collect(
+              Collectors.toMap(
+                  SystemTable::getTableName,
+                  table -> new StaticDrillTable(getName(), plugin, TableType.SYSTEM_TABLE, table, new PojoDataType(table.getPojoClass())),
+                  (o, n) -> n,
+                  CaseInsensitiveMap::newHashMap));
     }
 
     @Override
     public Set<String> getTableNames() {
-      return tableNames;
+      return tables.keySet();
     }
 
     @Override
     public DrillTable getTable(String name) {
-      for (SystemTable table : SystemTable.values()) {
-        if (table.getTableName().equalsIgnoreCase(name)) {
-          return new StaticDrillTable(getName(), SystemTablePlugin.this, TableType.SYSTEM_TABLE,
-            table, new PojoDataType(table.getPojoClass()));
-        }
-      }
-      return null;
+      return tables.get(name);
     }
 
     @Override
@@ -105,5 +111,9 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
       return SystemTablePluginConfig.NAME;
     }
 
+    @Override
+    public boolean areTableNamesCaseSensitive() {
+      return false;
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
index 360182e..914fcf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.logical.StoragePluginConfig;
  * A namesake plugin configuration for system tables.
  */
 public class SystemTablePluginConfig extends StoragePluginConfig {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
 
   public static final String NAME = "system-tables";
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index 0f71775..7a9c9a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -434,18 +434,36 @@ public class MetadataProvider {
 
   /**
    * Helper method to create a {@link InfoSchemaFilter} that combines the given filters with an AND.
+   *
    * @param catalogNameFilter Optional filter on <code>catalog name</code>
    * @param schemaNameFilter Optional filter on <code>schema name</code>
    * @param tableNameFilter Optional filter on <code>table name</code>
    * @param tableTypeFilter Optional filter on <code>table type</code>
    * @param columnNameFilter Optional filter on <code>column name</code>
-   * @return
+   * @return information schema filter
    */
-  private static InfoSchemaFilter createInfoSchemaFilter(final LikeFilter catalogNameFilter,
-      final LikeFilter schemaNameFilter, final LikeFilter tableNameFilter, List<String> tableTypeFilter, final LikeFilter columnNameFilter) {
+  private static InfoSchemaFilter createInfoSchemaFilter(LikeFilter catalogNameFilter,
+                                                         LikeFilter schemaNameFilter,
+                                                         LikeFilter tableNameFilter,
+                                                         List<String> tableTypeFilter,
+                                                         LikeFilter columnNameFilter) {
 
     FunctionExprNode exprNode = createLikeFunctionExprNode(CATS_COL_CATALOG_NAME,  catalogNameFilter);
 
+    // schema names are case insensitive in Drill and stored in lower case
+    // convert like filter condition elements to lower case
+    if (schemaNameFilter != null) {
+      LikeFilter.Builder builder = LikeFilter.newBuilder();
+      if (schemaNameFilter.hasPattern()) {
+        builder.setPattern(schemaNameFilter.getPattern().toLowerCase());
+      }
+
+      if (schemaNameFilter.hasEscape()) {
+        builder.setEscape(schemaNameFilter.getEscape().toLowerCase());
+      }
+      schemaNameFilter = builder.build();
+    }
+
     exprNode = combineFunctions(AND_FUNCTION,
         exprNode,
         combineFunctions(OR_FUNCTION,
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index e3b0604..eca59b9 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -32,16 +32,23 @@ drill {
       org.apache.drill.exec.store.StoragePlugin
     ],
 
-    annotations += org.apache.drill.exec.expr.annotations.FunctionTemplate
+    annotations: ${?drill.classpath.scanning.annotations} [
+      org.apache.drill.exec.expr.annotations.FunctionTemplate,
+      org.apache.drill.exec.store.SystemPlugin
+    ],
 
     packages: ${?drill.classpath.scanning.packages} [
-          org.apache.drill.exec.expr,
-          org.apache.drill.exec.physical,
-          org.apache.drill.exec.store,
-          org.apache.drill.exec.rpc.user.security,
-          org.apache.drill.exec.rpc.security,
-          org.apache.drill.exec.server.rest.auth
-    ]
+      org.apache.drill.exec.expr,
+      org.apache.drill.exec.physical,
+      org.apache.drill.exec.store,
+      org.apache.drill.exec.rpc.user.security,
+      org.apache.drill.exec.rpc.security,
+      org.apache.drill.exec.server.rest.auth
+    ],
+
+    // caches scanned result during build time
+    // set to false to avoid the need for a full Drill build during development
+    cache.enabled: true
   }
 }
 
@@ -231,7 +238,7 @@ drill.exec: {
   },
   debug: {
     // If true, inserts the iterator validator atop each operator.
-    // Primrily used for testing.
+    // Primarily used for testing.
     validate_iterators: false,
     // If iterator validation is enabled, also validates the vectors
     // in each batch. Primarily used for testing. To enable from
diff --git a/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java b/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
index f4dc837..01b1b3b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/common/scanner/TestClassPathScanner.java
@@ -17,21 +17,19 @@
  */
 package org.apache.drill.common.scanner;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.sort;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.config.DrillConfig;
@@ -43,26 +41,27 @@ import org.apache.drill.exec.expr.DrillFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.fn.impl.testing.GeneratorFunctions.RandomBigIntGauss;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.SystemPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({SlowTest.class})
 public class TestClassPathScanner {
 
-  @SafeVarargs
-  final private <T extends Comparable<? super T>> void assertListEqualsUnordered(Collection<T> list, T... expected) {
-    List<T> expectedList = asList(expected);
-    sort(expectedList);
-    List<T> gotList = new ArrayList<>(list);
-    sort(gotList);
-    assertEquals(expectedList.toString(), gotList.toString());
+  private static ScanResult result;
+
+  @BeforeClass
+  public static void init() {
+    result = ClassPathScanner.fromPrescan(DrillConfig.create());
   }
 
   @Test
-  public void test() throws Exception {
-    ScanResult result = ClassPathScanner.fromPrescan(DrillConfig.create());
-    List<AnnotatedClassDescriptor> functions = result.getAnnotatedClasses();
+  public void testFunctionTemplates() throws Exception {
+    List<AnnotatedClassDescriptor> functions = result.getAnnotatedClasses(FunctionTemplate.class.getName());
     Set<String> scanned = new HashSet<>();
     AnnotatedClassDescriptor functionRandomBigIntGauss = null;
     for (AnnotatedClassDescriptor function : functions) {
@@ -94,6 +93,7 @@ public class TestClassPathScanner {
       List<AnnotationDescriptor> scannedAnnotations = function.getAnnotations();
       verifyAnnotations(annotations, scannedAnnotations);
       FunctionTemplate bytecodeAnnotation = function.getAnnotationProxy(FunctionTemplate.class);
+      assertNotNull(bytecodeAnnotation);
       FunctionTemplate reflectionAnnotation = c.getAnnotation(FunctionTemplate.class);
       assertEquals(reflectionAnnotation.name(), bytecodeAnnotation.name());
       assertArrayEquals(reflectionAnnotation.names(), bytecodeAnnotation.names());
@@ -110,6 +110,17 @@ public class TestClassPathScanner {
     assertTrue(result.getImplementations(DrillFunc.class).size() > 0);
   }
 
+  @Test
+  public void testSystemPlugins() {
+    List<AnnotatedClassDescriptor> annotatedClasses = result.getAnnotatedClasses(SystemPlugin.class.getName());
+    List<AnnotatedClassDescriptor> foundPlugins =
+        annotatedClasses.stream()
+            .filter(a -> InfoSchemaStoragePlugin.class.getName().equals(a.getClassName())
+                || SystemTablePlugin.class.getName().equals(a.getClassName()))
+            .collect(Collectors.toList());
+    assertEquals(2, foundPlugins.size());
+  }
+
   private <T> void validateType(ScanResult result, String baseType) throws ClassNotFoundException {
     if (baseType.startsWith("org.apache.hadoop.hive")) {
       return;
@@ -132,8 +143,8 @@ public class TestClassPathScanner {
       Class<? extends Annotation> annotationType = annotation.annotationType();
       assertEquals(annotationType.getName(), scannedAnnotation.getAnnotationType());
       if (annotation instanceof FunctionTemplate) {
-        FunctionTemplate ft = (FunctionTemplate)annotation;
-        if (ft.name() != null && !ft.name().equals("")) {
+        FunctionTemplate ft = (FunctionTemplate) annotation;
+        if (!"".equals(ft.name())) {
           assertEquals(ft.name(), scannedAnnotation.getSingleValue("name"));
         }
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
index e0e6c79..aaa5ee9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
@@ -57,10 +57,10 @@ public class TestZookeeperClient {
   private CuratorFramework curator;
   private ZookeeperClient client;
 
-  static class ClientWithMockCache extends ZookeeperClient {
+  private static class ClientWithMockCache extends ZookeeperClient {
     private final PathChildrenCache cacheMock = Mockito.mock(PathChildrenCache.class);
 
-    public ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
+    ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
       super(curator, root, mode);
     }
 
@@ -97,7 +97,7 @@ public class TestZookeeperClient {
 
     Mockito
         .verify(client.getCache())
-        .start();
+        .start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
   }
 
   @Test
@@ -163,7 +163,7 @@ public class TestZookeeperClient {
         .when(client.getCache().getCurrentData(abspath))
         .thenReturn(null);
 
-    assertEquals("get should return null", null, client.get(path));
+    assertNull("get should return null", client.get(path));
 
     Mockito
         .when(client.getCache().getCurrentData(abspath))
@@ -198,7 +198,7 @@ public class TestZookeeperClient {
 
 
   @Test
-  public void testEntriesReturnsRelativePaths() throws Exception {
+  public void testEntriesReturnsRelativePaths() {
     final ChildData child = Mockito.mock(ChildData.class);
     Mockito
         .when(child.getPath())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 3b25ddb..17756a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class BaseTestImpersonation extends PlanTestBase {
-  protected static final String MINIDFS_STORAGE_PLUGIN_NAME = "miniDfsPlugin";
+  protected static final String MINI_DFS_STORAGE_PLUGIN_NAME = "mini_dfs_plugin";
   protected static final String processUser = System.getProperty("user.name");
 
   protected static MiniDFSCluster dfsCluster;
@@ -74,8 +74,8 @@ public class BaseTestImpersonation extends PlanTestBase {
 
   /**
    * Start a MiniDFS cluster backed Drillbit cluster with impersonation enabled.
-   * @param testClass
-   * @throws Exception
+   * @param testClass test class
+   * @throws Exception in case of errors during start up
    */
   protected static void startMiniDfsCluster(String testClass) throws Exception {
     startMiniDfsCluster(testClass, true);
@@ -83,12 +83,11 @@ public class BaseTestImpersonation extends PlanTestBase {
 
   /**
    * Start a MiniDFS cluster backed Drillbit cluster
-   * @param testClass
+   * @param testClass test class
    * @param isImpersonationEnabled Enable impersonation in the cluster?
-   * @throws Exception
+   * @throws Exception in case of errors during start up
    */
-  protected static void startMiniDfsCluster(
-      final String testClass, final boolean isImpersonationEnabled) throws Exception {
+  protected static void startMiniDfsCluster(String testClass, boolean isImpersonationEnabled) throws Exception {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(testClass), "Expected a non-null and non-empty test class name");
     dfsConf = new Configuration();
 
@@ -133,7 +132,7 @@ public class BaseTestImpersonation extends PlanTestBase {
 
     FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null, workspaces, lfsPluginConfig.getFormats());
     miniDfsPluginConfig.setEnabled(true);
-    pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+    pluginRegistry.createOrUpdate(MINI_DFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
   }
 
   protected static void createAndAddWorkspace(String name, String path, short permissions, String owner,
@@ -168,7 +167,7 @@ public class BaseTestImpersonation extends PlanTestBase {
 
   // Return the user workspace for given user.
   protected static String getWSSchema(String user) {
-    return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+    return MINI_DFS_STORAGE_PLUGIN_NAME + "." + user;
   }
 
   protected static String getUserHome(String user) {
@@ -194,7 +193,7 @@ public class BaseTestImpersonation extends PlanTestBase {
                                  final String viewDef) throws Exception {
     updateClient(viewOwner);
     test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, (short) 0750));
-    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
+    test("CREATE VIEW %s.%s.%s AS %s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
     final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
     fs.setOwner(viewFilePath, viewOwner, viewGroup);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
index cebf649..700d820 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
@@ -45,8 +45,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
 
   private static void createTestData() throws Exception {
     // Create test table in minidfs.tmp schema for use in test queries
-    test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`",
-        MINIDFS_STORAGE_PLUGIN_NAME));
+    test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINI_DFS_STORAGE_PLUGIN_NAME));
 
     // generate a large enough file that the DFS will not fulfill requests to read a
     // page of data all at once, see notes above testReadLargeParquetFileFromDFS()
@@ -59,8 +58,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
             "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)" +
-        "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)",
-        MINIDFS_STORAGE_PLUGIN_NAME));
+        "UNION ALL (SELECT employee_id, full_name FROM cp.`employee.json`)", MINI_DFS_STORAGE_PLUGIN_NAME));
   }
 
   /**
@@ -77,7 +75,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
    */
   @Test
   public void testReadLargeParquetFileFromDFS() throws Exception {
-    test(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME));
+    test(String.format("USE %s", MINI_DFS_STORAGE_PLUGIN_NAME));
     test("SELECT * FROM tmp.`large_employee`");
   }
 
@@ -87,7 +85,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
         String.format("SELECT sales_city, sales_country FROM tmp.dfsRegion ORDER BY region_id DESC LIMIT 2");
 
     testBuilder()
-        .optionSettingQueriesForTestQuery(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME))
+        .optionSettingQueriesForTestQuery(String.format("USE %s", MINI_DFS_STORAGE_PLUGIN_NAME))
         .sqlQuery(query)
         .unOrdered()
         .baselineColumns("sales_city", "sales_country")
@@ -98,7 +96,7 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
 
   @AfterClass
   public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index 08c09d1..d14c7ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -52,8 +52,8 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   private static final String user1 = "drillTestUser1";
   private static final String user2 = "drillTestUser2";
 
-  private static final String group0 = "drillTestGrp0";
-  private static final String group1 = "drillTestGrp1";
+  private static final String group0 = "drill_test_grp_0";
+  private static final String group1 = "drill_test_grp_1";
 
   static {
     UserGroupInformation.createUserForTesting(user1, new String[]{ group1, group0 });
@@ -75,23 +75,23 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
     Map<String, WorkspaceConfig> workspaces = Maps.newHashMap();
 
-    // Create /drillTestGrp0_700 directory with permissions 700 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_700", "/drillTestGrp0_700", (short)0700, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_700 directory with permissions 700 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_700", "/drill_test_grp_0_700", (short)0700, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_750 directory with permissions 750 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_750", "/drillTestGrp0_750", (short)0750, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_750 directory with permissions 750 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_750", "/drill_test_grp_0_750", (short)0750, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_755 directory with permissions 755 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_755", "/drillTestGrp0_755", (short)0755, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_755 directory with permissions 755 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_755", "/drill_test_grp_0_755", (short)0755, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_770 directory with permissions 770 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_770", "/drillTestGrp0_770", (short)0770, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_770 directory with permissions 770 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_770", "/drill_test_grp_0_770", (short)0770, processUser, group0, workspaces);
 
-    // Create /drillTestGrp0_777 directory with permissions 777 (owned by user running the tests)
-    createAndAddWorkspace("drillTestGrp0_777", "/drillTestGrp0_777", (short)0777, processUser, group0, workspaces);
+    // Create /drill_test_grp_0_777 directory with permissions 777 (owned by user running the tests)
+    createAndAddWorkspace("drill_test_grp_0_777", "/drill_test_grp_0_777", (short)0777, processUser, group0, workspaces);
 
-    // Create /drillTestGrp1_700 directory with permissions 700 (owned by user1)
-    createAndAddWorkspace("drillTestGrp1_700", "/drillTestGrp1_700", (short)0700, user1, group1, workspaces);
+    // Create /drill_test_grp_1_700 directory with permissions 700 (owned by user1)
+    createAndAddWorkspace("drill_test_grp_1_700", "/drill_test_grp_1_700", (short)0700, user1, group1, workspaces);
 
     // create /user2_workspace1 with 775 permissions (owner by user1)
     createAndAddWorkspace("user2_workspace1", "/user2_workspace1", (short)0775, user2, group1, workspaces);
@@ -107,17 +107,17 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
     // create tables as user2
     updateClient(user2);
-    test("use `%s.user2_workspace1`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace1`", MINI_DFS_STORAGE_PLUGIN_NAME);
     // create a table that can be dropped by another user in a different group
     test("create table parquet_table_775 as select * from cp.`employee.json`");
 
     // create a table that cannot be dropped by another user
-    test("use `%s.user2_workspace2`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace2`", MINI_DFS_STORAGE_PLUGIN_NAME);
     test("create table parquet_table_700 as select * from cp.`employee.json`");
 
     // Drop tables as user1
     updateClient(user1);
-    test("use `%s.user2_workspace1`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace1`", MINI_DFS_STORAGE_PLUGIN_NAME);
     testBuilder()
         .sqlQuery("drop table parquet_table_775")
         .unOrdered()
@@ -125,7 +125,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
         .baselineValues(true, String.format("Table [%s] dropped", "parquet_table_775"))
         .go();
 
-    test("use `%s.user2_workspace2`", MINIDFS_STORAGE_PLUGIN_NAME);
+    test("use `%s.user2_workspace2`", MINI_DFS_STORAGE_PLUGIN_NAME);
     boolean dropFailed = false;
     try {
       test("drop table parquet_table_700");
@@ -142,7 +142,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
     updateClient(processUser);
 
     // Process user start the mini dfs, he has read/write permissions by default
-    final String viewName = String.format("%s.drillTestGrp0_700.testView", MINIDFS_STORAGE_PLUGIN_NAME);
+    final String viewName = String.format("%s.drill_test_grp_0_700.testView", MINI_DFS_STORAGE_PLUGIN_NAME);
     try {
       test("CREATE VIEW " + viewName + " AS SELECT * FROM cp.`region.json`");
       test("SELECT * FROM " + viewName + " LIMIT 2");
@@ -155,20 +155,20 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   public void testShowFilesInWSWithUserAndGroupPermissionsForQueryUser() throws Exception {
     updateClient(user1);
 
-    // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_1_700" which is owned by "user1"
+    int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
 
-    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for "user1"
-    count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_0_750" which is owned by "processUser" and has group permissions for "user1"
+    count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_0_750", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
     updateClient(user2);
-    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
-    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+    // Try show tables in schema "drill_test_grp_0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
+    int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME));
     assertTrue(count > 0);
   }
 
@@ -178,8 +178,8 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
     try {
       setSessionOption(ExecConstants.LIST_FILES_RECURSIVELY, true);
-      // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-      int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+      // Try show tables in schema "drill_test_grp_1_700" which is owned by "user1"
+      int count = testSql(String.format("SHOW FILES IN %s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME));
       assertEquals("Counts should match", 0, count);
     } finally {
       resetSessionOption(ExecConstants.LIST_FILES_RECURSIVELY);
@@ -189,53 +189,53 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   @Test
   public void testShowSchemasAsUser1() throws Exception {
     // "user1" is part of "group0" and has access to following workspaces
-    // drillTestGrp1_700 (through ownership)
-    // drillTestGrp0_750, drillTestGrp0_770 (through "group" category permissions)
-    // drillTestGrp0_755, drillTestGrp0_777 (through "others" category permissions)
+    // drill_test_grp_1_700 (through ownership)
+    // drill_test_grp_0_750, drill_test_grp_0_770 (through "group" category permissions)
+    // drill_test_grp_0_755, drill_test_grp_0_777 (through "others" category permissions)
     updateClient(user1);
     testBuilder()
-        .sqlQuery("SHOW SCHEMAS LIKE '%drillTest%'")
+        .sqlQuery("SHOW SCHEMAS LIKE '%drill_test%'")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME")
-        .baselineValues(String.format("%s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_770", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_777", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_750", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_770", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_777", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_1_700", MINI_DFS_STORAGE_PLUGIN_NAME))
         .go();
   }
 
   @Test
   public void testShowSchemasAsUser2() throws Exception {
     // "user2" is part of "group0", but part of "group1" and has access to following workspaces
-    // drillTestGrp0_755, drillTestGrp0_777 (through "others" category permissions)
+    // drill_test_grp_0_755, drill_test_grp_0_777 (through "others" category permissions)
     updateClient(user2);
     testBuilder()
-        .sqlQuery("SHOW SCHEMAS LIKE '%drillTest%'")
+        .sqlQuery("SHOW SCHEMAS LIKE '%drill_test%'")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME")
-        .baselineValues(String.format("%s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME))
-        .baselineValues(String.format("%s.drillTestGrp0_777", MINIDFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_755", MINI_DFS_STORAGE_PLUGIN_NAME))
+        .baselineValues(String.format("%s.drill_test_grp_0_777", MINI_DFS_STORAGE_PLUGIN_NAME))
         .go();
   }
 
   @Test
   public void testCreateViewInDirWithUserPermissionsForQueryUser() throws Exception {
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp1_700"; // Workspace dir owned by "user1"
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_1_700"; // Workspace dir owned by "user1"
     testCreateViewTestHelper(user1, viewSchema, "view1");
   }
 
   @Test
   public void testCreateViewInDirWithGroupPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_770";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_770";
     testCreateViewTestHelper(user1, viewSchema, "view1");
   }
 
   @Test
   public void testCreateViewInDirWithOtherPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_777";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_777";
     testCreateViewTestHelper(user2, viewSchema, "view1");
   }
 
@@ -273,7 +273,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   @Test
   public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_755";
+    final String viewSchema = MINI_DFS_STORAGE_PLUGIN_NAME + ".drill_test_grp_0_755";
     final String viewName = "view1";
 
     updateClient(user2);
@@ -282,7 +282,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
     final String query = "CREATE VIEW " + viewName + " AS SELECT " +
         "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;";
-    final String expErrorMsg = "PERMISSION ERROR: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755/";
+    final String expErrorMsg = "PERMISSION ERROR: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drill_test_grp_0_755/";
     errorMsgTestHelper(query, expErrorMsg);
 
     // SHOW TABLES is expected to return no records as view creation fails above.
@@ -296,21 +296,21 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
   @Test
   public void testCreateTableInDirWithUserPermissionsForQueryUser() throws Exception {
-    final String tableWS = "drillTestGrp1_700"; // Workspace dir owned by "user1"
+    final String tableWS = "drill_test_grp_1_700"; // Workspace dir owned by "user1"
     testCreateTableTestHelper(user1, tableWS, "table1");
   }
 
   @Test
   public void testCreateTableInDirWithGroupPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
-    final String tableWS = "drillTestGrp0_770";
+    final String tableWS = "drill_test_grp_0_770";
     testCreateTableTestHelper(user1, tableWS, "table1");
   }
 
   @Test
   public void testCreateTableInDirWithOtherPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String tableWS = "drillTestGrp0_777";
+    final String tableWS = "drill_test_grp_0_777";
     testCreateTableTestHelper(user2, tableWS, "table1");
   }
 
@@ -319,7 +319,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
     try {
       updateClient(user);
 
-      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+      test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
       test("CREATE TABLE " + tableName + " AS SELECT " +
           "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
@@ -345,7 +345,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   @Test
   public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception {
     // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
-    final String tableWS = "drillTestGrp0_755";
+    final String tableWS = "drill_test_grp_0_755";
     final String tableName = "table1";
 
     UserRemoteException ex = null;
@@ -353,7 +353,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
     try {
       updateClient(user2);
 
-      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+      test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
       test("CREATE TABLE " + tableName + " AS SELECT " +
           "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
@@ -363,16 +363,16 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(),
-        containsString("SYSTEM ERROR: RemoteException: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755/"));
+        containsString("SYSTEM ERROR: RemoteException: Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drill_test_grp_0_755/"));
   }
 
   @Test
   public void testRefreshMetadata() throws Exception {
     final String tableName = "nation1";
-    final String tableWS = "drillTestGrp1_700";
+    final String tableWS = "drill_test_grp_1_700";
 
     updateClient(user1);
-    test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+    test("USE " + Joiner.on(".").join(MINI_DFS_STORAGE_PLUGIN_NAME, tableWS));
 
     test("CREATE TABLE " + tableName + " partition by (n_regionkey) AS SELECT * " +
               "FROM cp.`tpch/nation.parquet`;");
@@ -390,8 +390,8 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   }
 
   @AfterClass
-  public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+  public static void removeMiniDfsBasedStorage() {
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index c3c1fc5..ec9d75a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -205,8 +205,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(), containsString("PERMISSION ERROR: " +
-      String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]",
-        MINIDFS_STORAGE_PLUGIN_NAME)));
+      String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]", MINI_DFS_STORAGE_PLUGIN_NAME)));
   }
 
   @Test
@@ -261,19 +260,19 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
   public void sequenceFileChainedImpersonationWithView() throws Exception {
     // create a view named "simple_seq_view" on "simple.seq". View is owned by user0:group0 and has permissions 750
     createView(org1Users[0], org1Groups[0], "simple_seq_view",
-      String.format("SELECT convert_from(t.binary_key, 'UTF8') as k FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+      String.format("SELECT convert_from(t.binary_key, 'UTF8') as k FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
         new Path(getUserHome(org1Users[0]), "simple.seq")));
     try {
       updateClient(org1Users[1]);
-      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view");
+      test("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
     createView(org1Users[1], org1Groups[1], "simple_seq_view_2",
-      String.format("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view"));
+      String.format("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view"));
     try {
       updateClient(org1Users[2]);
-      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view_2");
+      test("SELECT k FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view_2");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
@@ -282,11 +281,11 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
   @Test
   public void avroChainedImpersonationWithView() throws Exception {
     createView(org1Users[0], org1Groups[0], "simple_avro_view",
-      String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+      String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
         new Path(getUserHome(org1Users[0]), "simple.avro")));
     try {
       updateClient(org1Users[1]);
-      test("SELECT h_boolean FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
+      test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
     } catch (UserRemoteException e) {
       assertNull("This test should pass.", e);
     }
@@ -294,7 +293,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
 
   @AfterClass
   public static void removeMiniDfsBasedStorage() throws Exception {
-    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    getDrillbitContext().getStorage().deletePlugin(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
index dea4c59..f7c329c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
@@ -96,7 +96,7 @@ public class TestDirectoryExplorerUDFs extends PlanTestBase {
         .add("BIGFILE_2")
         .build();
 
-    String query = "select * from dfs.`%s/*/*.csv` where dir0 = %s('dfs.root','%s')";
+    String query = "select * from dfs.`%s/*/*.csv` where dir0 = %s('dFs.RoOt','%s')";
     for (ConstantFoldingTestConfig config : tests) {
       // make all of the other folders unexpected patterns, except for the one expected in this case
       List<String> excludedPatterns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 6e7d054..e295eee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -17,33 +17,33 @@
  */
 package org.apache.drill.exec.sql;
 
-import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SqlTest;
-import org.apache.drill.test.TestBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Contains tests for
  * -- InformationSchema
@@ -84,15 +84,13 @@ public class TestInfoSchema extends BaseTestQuery {
 
   @Test
   public void showTablesFromDb() throws Exception{
-    final List<String[]> expected =
-        ImmutableList.of(
-            new String[] { "INFORMATION_SCHEMA", "VIEWS" },
-            new String[] { "INFORMATION_SCHEMA", "COLUMNS" },
-            new String[] { "INFORMATION_SCHEMA", "TABLES" },
-            new String[] { "INFORMATION_SCHEMA", "CATALOGS" },
-            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" },
-            new String[] { "INFORMATION_SCHEMA", "FILES" }
-        );
+    final List<String[]> expected = Arrays.asList(
+        new String[]{"information_schema", "VIEWS"},
+        new String[]{"information_schema", "COLUMNS"},
+        new String[]{"information_schema", "TABLES"},
+        new String[]{"information_schema", "CATALOGS"},
+        new String[]{"information_schema", "SCHEMATA"},
+        new String[]{"information_schema", "FILES"});
 
     final TestBuilder t1 = testBuilder()
         .sqlQuery("SHOW TABLES FROM INFORMATION_SCHEMA")
@@ -119,7 +117,7 @@ public class TestInfoSchema extends BaseTestQuery {
         .sqlQuery("SHOW TABLES FROM INFORMATION_SCHEMA WHERE TABLE_NAME='VIEWS'")
         .unOrdered()
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
-        .baselineValues("INFORMATION_SCHEMA", "VIEWS")
+        .baselineValues("information_schema", "VIEWS")
         .go();
   }
 
@@ -130,38 +128,26 @@ public class TestInfoSchema extends BaseTestQuery {
         .unOrdered()
         .optionSettingQueriesForTestQuery("USE INFORMATION_SCHEMA")
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
-        .baselineValues("INFORMATION_SCHEMA", "SCHEMATA")
+        .baselineValues("information_schema", "SCHEMATA")
         .go();
   }
 
   @Test
   public void showDatabases() throws Exception{
-    final List<String[]> expected =
-        ImmutableList.of(
-            new String[] { "dfs.default" },
-            new String[] { "dfs.root" },
-            new String[] { "dfs.tmp" },
-            new String[] { "cp.default" },
-            new String[] { "sys" },
-            new String[] { "INFORMATION_SCHEMA" }
-        );
+    List<String> expected = Arrays.asList("dfs.default", "dfs.root", "dfs.tmp", "cp.default", "sys", "information_schema");
 
-    final TestBuilder t1 = testBuilder()
+    TestBuilder t1 = testBuilder()
         .sqlQuery("SHOW DATABASES")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME");
-    for(String[] expectedRow : expected) {
-      t1.baselineValues(expectedRow);
-    }
+    expected.forEach(t1::baselineValues);
     t1.go();
 
-    final TestBuilder t2 = testBuilder()
+    TestBuilder t2 = testBuilder()
         .sqlQuery("SHOW SCHEMAS")
         .unOrdered()
         .baselineColumns("SCHEMA_NAME");
-    for(String[] expectedRow : expected) {
-      t2.baselineValues(expectedRow);
-    }
+    expected.forEach(t2::baselineValues);
     t2.go();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
new file mode 100644
index 0000000..c274e0b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchemaCaseInsensitivity extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testUseCommand() throws Exception {
+    queryBuilder().sql("use Information_Schema").run();
+    queryBuilder().sql("use Sys").run();
+    queryBuilder().sql("use Dfs").run();
+    queryBuilder().sql("use Dfs.Tmp").run();
+  }
+
+  @Test
+  public void testDescribeSchema() throws Exception {
+    checkRecordCount(1, "describe schema SyS");
+    checkRecordCount(1, "describe schema Information_Schema");
+
+    client.testBuilder()
+        .sqlQuery("describe schema DfS.tMp")
+        .unOrdered()
+        .sqlBaselineQuery("describe schema dfs.tmp")
+        .go();
+  }
+
+  @Test
+  public void testDescribeTable() throws Exception {
+    checkRecordCount(4, "describe Information_Schema.`Tables`");
+    checkRecordCount(1, "describe Information_Schema.`Tables` Table_Catalog");
+    checkRecordCount(1, "describe Information_Schema.`Tables` '%Catalog'");
+    checkRecordCount(6, "describe SyS.Version");
+  }
+
+
+  @Test
+  public void testShowSchemas() throws Exception {
+    checkRecordCount(1, "show schemas like '%Y%'");
+    checkRecordCount(1, "show schemas like 'Info%'");
+    checkRecordCount(1, "show schemas like 'D%Tmp'");
+  }
+
+  @Test
+  public void testShowTables() throws Exception {
+    checkRecordCount(1, "show tables in Information_Schema like 'SC%'");
+    checkRecordCount(1, "show tables in Sys like '%ION'");
+  }
+
+  @Test
+  public void testSelectStatement() throws Exception {
+    checkRecordCount(1, "select * from Information_Schema.Schemata where Schema_Name = 'dfs.tmp'");
+    checkRecordCount(1, "select * from Sys.Version");
+  }
+
+  private void checkRecordCount(long recordCount, String sqlQuery) throws Exception {
+    QueryBuilder.QuerySummary summary = queryBuilder().sql(sqlQuery).run();
+    assertTrue(summary.succeeded());
+    assertEquals(recordCount, summary.recordCount());
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index 8d34ebc..c5b607b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.test.BaseTestQuery;
@@ -105,7 +105,7 @@ public class TestMetadataProvider extends BaseTestQuery {
     List<SchemaMetadata> schemas = resp.getSchemasList();
     assertEquals(6, schemas.size());
 
-    verifySchema("INFORMATION_SCHEMA", schemas);
+    verifySchema("information_schema", schemas);
     verifySchema("cp.default", schemas);
     verifySchema("dfs.default", schemas);
     verifySchema("dfs.root", schemas);
@@ -115,15 +115,15 @@ public class TestMetadataProvider extends BaseTestQuery {
 
   @Test
   public void schemasWithSchemaNameFilter() throws Exception {
-    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%y%'"); // SQL equivalent
+    // test("SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%tion_sc%'"); // SQL equivalent
 
-    GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setPattern("%y%").build()).get();
+    GetSchemasResp resp = client.getSchemas(null, LikeFilter.newBuilder().setPattern("%TiOn_Sc%").build()).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<SchemaMetadata> schemas = resp.getSchemasList();
     assertEquals(1, schemas.size());
 
-    verifySchema("sys", schemas);
+    verifySchema("information_schema", schemas);
   }
 
   @Test
@@ -151,12 +151,12 @@ public class TestMetadataProvider extends BaseTestQuery {
     List<TableMetadata> tables = resp.getTablesList();
     assertEquals(18, tables.size());
 
-    verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
-    verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
-    verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
-    verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
-    verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
-    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
+    verifyTable("information_schema", "CATALOGS", tables);
+    verifyTable("information_schema", "COLUMNS", tables);
+    verifyTable("information_schema", "SCHEMATA", tables);
+    verifyTable("information_schema", "TABLES", tables);
+    verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -172,7 +172,7 @@ public class TestMetadataProvider extends BaseTestQuery {
   public void tablesWithTableFilter() throws Exception {
     // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_TYPE IN ('TABLE')"); // SQL equivalent
 
-    GetTablesResp resp = client.getTables(null, null, null, Arrays.asList("TABLE")).get();
+    GetTablesResp resp = client.getTables(null, null, null, Collections.singletonList("TABLE")).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
@@ -183,18 +183,18 @@ public class TestMetadataProvider extends BaseTestQuery {
   public void tablesWithSystemTableFilter() throws Exception {
     // test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_TYPE IN ('SYSTEM_TABLE')"); // SQL equivalent
 
-    GetTablesResp resp = client.getTables(null, null, null, Arrays.asList("SYSTEM_TABLE")).get();
+    GetTablesResp resp = client.getTables(null, null, null, Collections.singletonList("SYSTEM_TABLE")).get();
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
     assertEquals(18, tables.size());
 
-    verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
-    verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
-    verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
-    verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
-    verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
-    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
+    verifyTable("information_schema", "CATALOGS", tables);
+    verifyTable("information_schema", "COLUMNS", tables);
+    verifyTable("information_schema", "SCHEMATA", tables);
+    verifyTable("information_schema", "TABLES", tables);
+    verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);