You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2020/03/13 16:15:33 UTC

[drill] branch master updated: DRILL-7620: Fix plugin mutability issues

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 63e64c2  DRILL-7620: Fix plugin mutability issues
63e64c2 is described below

commit 63e64c2a8156a66ba1b1c8c1ec62e8da467bbbc9
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue Mar 3 15:03:49 2020 -0800

    DRILL-7620: Fix plugin mutability issues
    
    A recent commit made the plugin registry more strict about
    the rule that, once a plugin is registered, it must be
    immutable. A flaw enforcing that rule in the UI put the
    registry in an inconsistent state.
    
    Also
    * Registry-specific errors
    * Push more operations from UI layer into registry
    * Clean up semantics of "resolve" for plugins
    * Add more unit tests
    * Better handling of "bad" plugins
    * Force plugin names to lower case
    * Fix comparison bugs in some format plugins
---
 .../drill/exec/store/mapr/PluginConstants.java     |  74 +--
 .../drill/exec/store/mapr/PluginErrorHandler.java  |   1 -
 .../drill/exec/store/mapr/TableFormatMatcher.java  |   2 +-
 .../drill/exec/store/mapr/TableFormatPlugin.java   |  12 +-
 .../exec/store/mapr/TableFormatPluginConfig.java   |   1 -
 .../drill/exec/store/mapr/db/MapRDBSubScan.java    |   2 +-
 .../store/mapr/db/RestrictedMapRDBSubScan.java     |   2 +-
 .../store/mapr/db/binary/BinaryTableGroupScan.java |   7 +-
 .../store/mapr/db/binary/MapRDBFilterBuilder.java  |   2 +-
 .../store/mapr/db/json/JsonTableGroupScan.java     |   7 +-
 .../db/json/JsonTableRangePartitionFunction.java   |   5 +-
 .../drill/exec/store/syslog/TestSyslogFormat.java  |  19 +-
 .../exec/store/hbase/AbstractHBaseDrillTable.java  |   4 +-
 .../exec/store/hbase/DrillHBaseConstants.java      |  16 +-
 .../drill/exec/store/hbase/DrillHBaseTable.java    |   2 -
 .../exec/store/hbase/HBaseConnectionManager.java   |   5 +-
 .../drill/exec/store/hbase/HBaseGroupScan.java     |   8 +-
 .../exec/store/hbase/HBasePushFilterIntoScan.java  |   5 +-
 .../drill/exec/store/hbase/HBaseRecordReader.java  |  11 +-
 .../drill/exec/store/hbase/HBaseRegexParser.java   |   8 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java    |   2 +-
 .../drill/exec/store/hbase/HBaseSchemaFactory.java |   5 +-
 .../exec/store/hbase/HBaseStoragePluginConfig.java |   6 +-
 .../drill/exec/store/hbase/HBaseSubScan.java       |   6 +-
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |   2 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   2 +-
 .../org/apache/drill/exec/store/hive/HiveScan.java |   6 +-
 .../apache/drill/exec/store/hive/HiveSubScan.java  |   3 +-
 .../exec/store/hive/HiveTableWithColumnCache.java  |   1 +
 .../drill/exec/store/hive/HiveUtilities.java       |   4 +-
 .../exec/store/hive/schema/HiveSchemaFactory.java  |   6 +-
 .../org/apache/drill/exec/hive/HiveTestBase.java   |   1 -
 .../apache/drill/exec/hive/HiveTestFixture.java    |  21 +-
 .../drill/exec/store/jdbc/JdbcGroupScan.java       |   3 +-
 .../apache/drill/exec/store/jdbc/JdbcSubScan.java  |   2 +-
 .../drill/exec/store/kafka/KafkaGroupScan.java     |   3 +-
 .../drill/exec/store/kafka/KafkaSubScan.java       |   2 +-
 .../drill/exec/store/kudu/DrillKuduTable.java      |   1 -
 .../drill/exec/store/kudu/KuduGroupScan.java       |   9 +-
 .../drill/exec/store/kudu/KuduRecordReader.java    |   5 +-
 .../exec/store/kudu/KuduRecordWriterImpl.java      |  13 +-
 .../exec/store/kudu/KuduScanBatchCreator.java      |   2 -
 .../apache/drill/exec/store/kudu/KuduScanSpec.java |   2 -
 .../drill/exec/store/kudu/KuduSchemaFactory.java   |   9 +-
 .../drill/exec/store/kudu/KuduStoragePlugin.java   |   7 -
 .../exec/store/kudu/KuduStoragePluginConfig.java   |   1 -
 .../apache/drill/exec/store/kudu/KuduSubScan.java  |  21 +-
 .../apache/drill/exec/store/kudu/KuduWriter.java   |   3 +-
 .../exec/store/kudu/KuduWriterBatchCreator.java    |   1 -
 .../drill/exec/store/mongo/MongoFilterBuilder.java |   2 +-
 .../drill/exec/store/mongo/MongoGroupScan.java     |   7 +-
 .../exec/store/mongo/MongoStoragePluginConfig.java |   6 +-
 .../drill/exec/store/mongo/MongoSubScan.java       |   4 +-
 .../drill/exec/store/openTSDB/Constants.java       |  14 +-
 .../exec/store/openTSDB/OpenTSDBGroupScan.java     |   9 +-
 .../drill/exec/store/openTSDB/OpenTSDBSubScan.java |   7 +-
 .../org/apache/calcite/jdbc/DynamicRootSchema.java |   4 +-
 .../org/apache/drill/exec/opt/BasicOptimizer.java  |   6 +-
 .../exec/physical/base/AbstractGroupScan.java      |   3 +-
 .../logical/FileSystemCreateTableEntry.java        |   8 +-
 .../sql/handlers/DescribeSchemaHandler.java        |  21 +-
 .../exec/planner/sql/handlers/SqlHandlerUtil.java  |   4 +-
 .../drill/exec/server/rest/DrillRestServer.java    |  13 +-
 .../exec/server/rest/PluginConfigWrapper.java      |  41 +-
 .../drill/exec/server/rest/StorageResources.java   | 129 +++--
 .../drill/exec/store/ClassicConnectorLocator.java  |   3 +-
 .../exec/store/DrillbitPluginRegistryContext.java  |  10 +-
 .../exec/store/PluginBootstrapLoaderImpl.java      |   4 +-
 .../org/apache/drill/exec/store/PluginHandle.java  |   5 +
 .../drill/exec/store/PluginRegistryContext.java    |   1 +
 .../apache/drill/exec/store/StoragePluginMap.java  |  78 +--
 .../drill/exec/store/StoragePluginRegistry.java    | 242 +++++++-
 .../exec/store/StoragePluginRegistryImpl.java      | 433 +++++++++++---
 .../drill/exec/store/dfs/FileSystemConfig.java     |  11 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java     |   1 -
 .../drill/exec/store/dfs/easy/EasyGroupScan.java   |   7 +-
 .../drill/exec/store/dfs/easy/EasySubScan.java     |   4 +-
 .../drill/exec/store/dfs/easy/EasyWriter.java      |  12 +-
 .../sequencefile/SequenceFileFormatConfig.java     |  31 +-
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |  19 +-
 .../drill/exec/store/image/ImageFormatConfig.java  |  50 +-
 .../exec/store/parquet/ParquetFormatConfig.java    |  26 +-
 .../drill/exec/store/parquet/ParquetGroupScan.java |   3 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |   2 +-
 .../drill/exec/store/parquet/ParquetWriter.java    |   8 +-
 .../drill/exec/store/pcap/PcapFormatConfig.java    |  27 +-
 .../exec/store/pcapng/PcapngFormatConfig.java      |   9 +
 .../drill/exec/store/sys/SystemTableScan.java      |   5 +-
 .../drill/exec/util/StoragePluginTestUtils.java    |  12 +-
 .../impersonation/TestImpersonationMetadata.java   |   3 +-
 .../impersonation/TestImpersonationQueries.java    |   3 +-
 .../drill/exec/store/BasePluginRegistryTest.java   |   5 +-
 .../drill/exec/store/TestPluginRegistry.java       | 630 +++++++++++++++++----
 .../apache/drill/exec/store/TestPluginsMap.java    |  29 +-
 .../drill/exec/store/httpd/TestHTTPDLogReader.java |  23 +-
 .../apache/drill/exec/store/log/TestLogReader.java |  24 +-
 .../drill/exec/store/pcap/TestPcapEVFReader.java   |  19 +-
 .../java/org/apache/drill/test/ClusterFixture.java |  20 +-
 .../drill/test/ClusterMockStorageFixture.java      |   4 +-
 .../java/org/apache/drill/test/TestBuilder.java    |  14 +-
 .../common/config/LogicalPlanPersistence.java      |   6 +-
 .../drill/common/logical/StoragePluginConfig.java  |   3 +
 102 files changed, 1634 insertions(+), 772 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
index 7a175a2..f355812 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
@@ -22,19 +22,21 @@ import org.apache.drill.common.expression.SchemaPath;
 import com.mapr.db.DBConstants;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.PluginCost.CheckValid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class PluginConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PluginConstants.class);
+public interface PluginConstants {
+  Logger logger = LoggerFactory.getLogger(PluginConstants.class);
 
-  public final static CheckValid alwaysValid = new CheckValid<Integer>() {
+  public final static CheckValid<Integer> alwaysValid = new CheckValid<Integer>() {
     @Override
     public boolean isValid(Integer parameter) {
       return true;
     }
   };
 
-  public final static CheckValid isNonNegative = new CheckValid<Integer>() {
-    @Override
+  public final static CheckValid<Integer> isNonNegative = new CheckValid<Integer>() {
+     @Override
     public boolean isValid(Integer paramValue) {
       if (paramValue > 0 && paramValue <= Integer.MAX_VALUE) {
         return true;
@@ -45,49 +47,49 @@ public class PluginConstants {
     }
   };
 
-  public static final String SSD = "SSD";
-  public static final String HDD = "HDD";
-  public static final SchemaPath ID_SCHEMA_PATH = SchemaPath.getSimplePath(ID_KEY);
+  String SSD = "SSD";
+  String HDD = "HDD";
+  SchemaPath ID_SCHEMA_PATH = SchemaPath.getSimplePath(ID_KEY);
 
-  public static final SchemaPath DOCUMENT_SCHEMA_PATH = SchemaPath.getSimplePath(DBConstants.DOCUMENT_FIELD);
+  SchemaPath DOCUMENT_SCHEMA_PATH = SchemaPath.getSimplePath(DBConstants.DOCUMENT_FIELD);
 
-  public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
+  int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
 
-  public static final int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
-  public static final int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;
+  int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
+  int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;
 
-  public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
-  public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
+  String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
+  int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
 
-  public static final String JSON_TABLE_RESTRICTED_SCAN_SIZE_MB = "format-maprdb.json.restrictedScanSizeMB";
-  public static final int JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT = 4096;
+  String JSON_TABLE_RESTRICTED_SCAN_SIZE_MB = "format-maprdb.json.restrictedScanSizeMB";
+  int JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT = 4096;
 
-  public static final String JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING = "format-maprdb.json.useNumRegionsForDistribution";
-  public static final boolean JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING_DEFAULT = false;
+  String JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING = "format-maprdb.json.useNumRegionsForDistribution";
+  boolean JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING_DEFAULT = false;
 
-  public static final String JSON_TABLE_BLOCK_SIZE = "format-maprdb.json.pluginCost.blockSize";
-  public static final int JSON_TABLE_BLOCK_SIZE_DEFAULT = 8192;
+  String JSON_TABLE_BLOCK_SIZE = "format-maprdb.json.pluginCost.blockSize";
+  int JSON_TABLE_BLOCK_SIZE_DEFAULT = 8192;
 
-  public static final String JSON_TABLE_MEDIA_TYPE = "format-maprdb.json.mediaType";
-  public static final String JSON_TABLE_MEDIA_TYPE_DEFAULT = SSD;
+  String JSON_TABLE_MEDIA_TYPE = "format-maprdb.json.mediaType";
+  String JSON_TABLE_MEDIA_TYPE_DEFAULT = SSD;
 
-  public static final String JSON_TABLE_SSD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.ssdBlockSequentialReadCost";
-  public static final int JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * JSON_TABLE_BLOCK_SIZE_DEFAULT;
+  String JSON_TABLE_SSD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.ssdBlockSequentialReadCost";
+  int JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * JSON_TABLE_BLOCK_SIZE_DEFAULT;
 
   // for SSD random and sequential costs are the same
-  public static final String JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.ssdBlockRandomReadCost";
-  public static final int JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+  String JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.ssdBlockRandomReadCost";
+  int JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
 
-  public static final String JSON_TABLE_AVERGE_COLUMN_SIZE = "format-maprdb.json.pluginCost.averageColumnSize";
-  public static final int JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+  String JSON_TABLE_AVERGE_COLUMN_SIZE = "format-maprdb.json.pluginCost.averageColumnSize";
+  int JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
 
-  public static final int TABLE_BLOCK_SIZE_DEFAULT = 8192;
-  public static final int TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * TABLE_BLOCK_SIZE_DEFAULT;
-  public static final int TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
-  public static final int TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
-  public static final String JSON_TABLE_HDD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.hddBlockSequentialReadCost";
-  public static final int JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT = 6 * JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+  int TABLE_BLOCK_SIZE_DEFAULT = 8192;
+  int TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * TABLE_BLOCK_SIZE_DEFAULT;
+  int TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+  int TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+  String JSON_TABLE_HDD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.hddBlockSequentialReadCost";
+  int JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT = 6 * JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
 
-  public static final String JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.hddBlockRandomReadCost";
-  public static final int JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT = 1000 * JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT;
+  String JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.hddBlockRandomReadCost";
+  int JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT = 1000 * JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT;
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
index d106d6e..5163934 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
@@ -46,5 +46,4 @@ public final class PluginErrorHandler {
   public static SchemaChangeException schemaChangeException(Logger logger, Throwable t, String format, Object... args) {
     return new SchemaChangeException(format, t, args);
   }
-
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
index 1fca75c..b2ede05 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
@@ -44,6 +44,7 @@ public abstract class TableFormatMatcher extends FormatMatcher {
     return false;
   }
 
+  @Override
   public DrillTable isReadable(DrillFileSystem fs,
       FileSelection selection, FileSystemPlugin fsPlugin,
       String storageEngineName, SchemaConfig schemaConfig) throws IOException {
@@ -73,5 +74,4 @@ public abstract class TableFormatMatcher extends FormatMatcher {
    * by this format plugin. The path must point to a MapR table.
    */
   protected abstract boolean isSupportedTable(MapRFileStatus status) throws IOException;
-
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
index aeb117a..8f5717c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -41,8 +40,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import com.mapr.fs.MapRFileSystem;
 
 public abstract class TableFormatPlugin implements FormatPlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(TableFormatPlugin.class);
 
   private final StoragePluginConfig storageConfig;
   private final TableFormatPluginConfig config;
@@ -83,6 +80,7 @@ public abstract class TableFormatPlugin implements FormatPlugin {
     return false;
   }
 
+  @Override
   public Configuration getFsConf() {
     return fsConf;
   }
@@ -120,11 +118,8 @@ public abstract class TableFormatPlugin implements FormatPlugin {
 
   public synchronized AbstractStoragePlugin getStoragePlugin() {
     if (this.storagePlugin == null) {
-      try {
-        this.storagePlugin = (AbstractStoragePlugin) context.getStorage().getPlugin(storageConfig);
-      } catch (ExecutionSetupException e) {
-        throw new RuntimeException(e);
-      }
+      this.storagePlugin = context.getStorage().resolve(storageConfig,
+          AbstractStoragePlugin.class);
     }
     return storagePlugin;
   }
@@ -133,5 +128,4 @@ public abstract class TableFormatPlugin implements FormatPlugin {
   public MapRFileSystem getMaprFS() {
     return maprfs;
   }
-
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
index 72c69a6..f30d2b6 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
@@ -34,5 +34,4 @@ public abstract class TableFormatPluginConfig implements FormatPluginConfig {
   }
 
   protected abstract boolean impEquals(Object obj);
-
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
index d01a63b..92195f5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -61,7 +61,7 @@ public class MapRDBSubScan extends AbstractDbSubScan {
                        @JsonProperty("tableType") String tableType,
                        @JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
     this(userName,
-        (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
+        engineRegistry.resolveFormat(storageConfig, formatPluginConfig, MapRDBFormatPlugin.class),
         regionScanSpecList,
         columns,
         maxRecordsToRead,
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
index c1369a3..7fa8898 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
@@ -50,7 +50,7 @@ public class RestrictedMapRDBSubScan extends MapRDBSubScan {
                                  @JsonProperty("tableType") String tableType,
                                  @JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
     this(userName,
-        (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
+        engineRegistry.resolveFormat(storageConfig, formatPluginConfig, MapRDBFormatPlugin.class),
         regionScanSpecList, columns, maxRecordsToRead, tableType, schema);
   }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index ec496e9..7061372 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -73,7 +73,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
 
   public static final String TABLE_BINARY = "binary";
 
-  private HBaseScanSpec hbaseScanSpec;
+  private final HBaseScanSpec hbaseScanSpec;
 
   private HTableDescriptor hTableDesc;
 
@@ -87,8 +87,8 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
                               @JsonProperty("columns") List<SchemaPath> columns,
                               @JsonProperty("schema") TupleMetadata schema,
                               @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException {
-    this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
-        (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+    this(userName, pluginRegistry.resolve(storagePluginConfig, AbstractStoragePlugin.class),
+        pluginRegistry.resolveFormat(storagePluginConfig, formatPluginConfig, MapRDBFormatPlugin.class),
         scanSpec, columns, null /* tableStats */, FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
   }
 
@@ -160,6 +160,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
   }
 
 
+  @Override
   protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
     HBaseScanSpec spec = hbaseScanSpec;
     return new MapRDBSubScanSpec(
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
index f94e0a8..4cdf404 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
@@ -50,7 +50,7 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void
 
   private boolean allExpressionsConverted = true;
 
-  private static Boolean nullComparatorSupported = null;
+  private static Boolean nullComparatorSupported;
 
   public MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) {
     this.groupScan = groupScan;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index fd910a7..d720634 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -125,8 +125,8 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
                             @JsonProperty("columns") List<SchemaPath> columns,
                             @JsonProperty("schema") TupleMetadata schema,
                             @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException {
-    this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
-        (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+    this(userName, pluginRegistry.resolve(storagePluginConfig, AbstractStoragePlugin.class),
+        pluginRegistry.resolveFormat(storagePluginConfig, formatPluginConfig, MapRDBFormatPlugin.class),
         scanSpec, columns, new MapRDBStatistics(), FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
   }
 
@@ -207,6 +207,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
     }
   }
 
+  @Override
   protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
     return getRegionsToScan(formatPlugin.getScanRangeSizeMB());
   }
@@ -271,6 +272,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
     return doNotAccessRegionsToScan;
   }
 
+  @Override
   protected MapRDBSubScanSpec getSubScanSpec(final TabletFragmentInfo tfi) {
     // XXX/TODO check filter/Condition
     final JsonScanSpec spec = scanSpec;
@@ -438,6 +440,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
     return !formatPluginConfig.isEnablePushdown();
   }
 
+  @Override
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return formatPluginConfig.isEnablePushdown();
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
index b6d7bfb..c97d303 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -25,6 +25,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
 import org.apache.drill.exec.vector.ValueVector;
 import org.ojai.store.QueryCondition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -42,8 +44,7 @@ import com.mapr.org.apache.hadoop.hbase.util.Bytes;
 
 @JsonTypeName("jsontable-range-partition-function")
 public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunction {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
+  private static final Logger logger = LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
 
   @JsonProperty("refList")
   protected List<FieldReference> refList;
diff --git a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
index a45a07f..b195d51 100644
--- a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
+++ b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.store.syslog;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -48,21 +48,18 @@ public class TestSyslogFormat extends ClusterTest {
   }
 
   private static void defineSyslogPlugin() throws ExecutionSetupException {
+    Map<String, FormatPluginConfig> formats = new HashMap<>();
     SyslogFormatConfig sampleConfig = new SyslogFormatConfig();
     sampleConfig.setExtension("syslog");
+    formats.put("sample", sampleConfig);
 
     SyslogFormatConfig flattenedDataConfig = new SyslogFormatConfig();
     flattenedDataConfig.setExtension("syslog1");
     flattenedDataConfig.setFlattenStructuredData(true);
+    formats.put("flat", flattenedDataConfig);
 
     // Define a temporary plugin for the "cp" storage plugin.
-    Drillbit drillbit = cluster.drillbit();
-    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
-    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
-    pluginConfig.getFormats().put("sample", sampleConfig);
-    pluginConfig.getFormats().put("flat", flattenedDataConfig);
-    pluginRegistry.put("cp", pluginConfig);
+    cluster.defineFormats("cp", formats);
   }
 
   @Test
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
index 32f4d43..93d8739 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,7 +37,7 @@ import java.util.Set;
 import static org.apache.drill.exec.store.hbase.DrillHBaseConstants.ROW_KEY;
 
 public abstract class AbstractHBaseDrillTable extends DrillTable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHBaseDrillTable.class);
+  private static final Logger logger = LoggerFactory.getLogger(AbstractHBaseDrillTable.class);
 
   protected HTableDescriptor tableDesc;
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index db1b1fa..1238407 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -23,19 +23,19 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 
 public interface DrillHBaseConstants {
-  public static final String ROW_KEY = "row_key";
+  String ROW_KEY = "row_key";
 
-  public static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+  SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
 
-  public static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+  String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
 
-  public static final MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY);
+  MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY);
 
-  public static final MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP);
+  MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP);
 
-  public static final MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
+  MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
 
-  public static final String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";
+  String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";
 
-  public static final String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";
+  String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
index 16de5eb..501e06d 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
@@ -18,12 +18,10 @@
 package org.apache.drill.exec.store.hbase;
 
 
-
 public class DrillHBaseTable extends AbstractHBaseDrillTable {
 
   public DrillHBaseTable(String storageEngineName, HBaseStoragePlugin plugin, HBaseScanSpec scanSpec) {
     super(storageEngineName, plugin, scanSpec);
     setTableDesc(plugin.getConnection(), scanSpec.getTableName());
   }
-
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
index a6fa6f2..bb3e2a8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
@@ -26,7 +26,8 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.store.hbase.HBaseStoragePlugin.HBaseConnectionKey;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
 import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
@@ -40,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.util.concurrent.Unchecked
  */
 public final class HBaseConnectionManager
     extends CacheLoader<HBaseConnectionKey, Connection> implements RemovalListener<HBaseConnectionKey, Connection> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseConnectionManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseConnectionManager.class);
 
   public static final HBaseConnectionManager INSTANCE = new HBaseConnectionManager();
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index e1b41e8..cef696e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -69,7 +71,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseGroupScan.class);
 
   private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = (list1, list2) -> list1.size() - list2.size();
 
@@ -83,7 +85,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
 
   private HBaseStoragePlugin storagePlugin;
 
-  private Stopwatch watch = Stopwatch.createUnstarted();
+  private final Stopwatch watch = Stopwatch.createUnstarted();
 
   private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
 
@@ -103,7 +105,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
                         @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
+    this (userName, pluginRegistry.resolve(storagePluginConfig, HBaseStoragePlugin.class), hbaseScanSpec, columns);
   }
 
   public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 3faa089..517baec 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -40,7 +40,10 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule
     super(operand, description);
   }
 
-  public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new HBasePushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "HBasePushFilterIntoScan:Filter_On_Scan") {
+  public static final StoragePluginOptimizerRule FILTER_ON_SCAN =
+      new HBasePushFilterIntoScan(RelOptHelper.some(
+          FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+          "HBasePushFilterIntoScan:Filter_On_Scan") {
 
     @Override
     public void onMatch(RelOptRuleCall call) {
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 2db1d02..f68088a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -50,13 +50,14 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseRecordReader.class);
 
   // batch should not exceed this value to avoid OOM on a busy system
   private static final int MAX_ALLOCATED_MEMORY_PER_BATCH = 64 * 1024 * 1024; // 64 mb in bytes
@@ -72,10 +73,10 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
   private Table hTable;
   private ResultScanner resultScanner;
 
-  private TableName hbaseTableName;
-  private Scan hbaseScan;
+  private final TableName hbaseTableName;
+  private final Scan hbaseScan;
   // scan instance to capture columns for vector creation
-  private Scan hbaseScanColumnsOnly;
+  private final Scan hbaseScanColumnsOnly;
   private Set<String> completeFamilies;
   private OperatorContext operatorContext;
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
index 3d8f9c1..a2abf76 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
@@ -22,9 +22,11 @@ import java.util.regex.Pattern;
 
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HBaseRegexParser {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRegexParser.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseRegexParser.class);
 
   /**
    * Regular expression pattern to parse the value operand of the SQL LIKE operator.
@@ -45,9 +47,9 @@ public class HBaseRegexParser {
 
   private final String escapeChar_;
 
-  private String regexString_ = null;
+  private String regexString_;
 
-  private String prefixString_ = null;
+  private String prefixString_;
 
   public HBaseRegexParser(FunctionCall call) {
     this(likeString(call), escapeString(call));
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 967b8e6..a0fe3bd 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
+
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
@@ -50,5 +51,4 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
     }
     return new ScanBatch(subScan, context, readers);
   }
-
 }
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7e4c1ca..21e6d72 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -28,11 +28,12 @@ import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Admin;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class HBaseSchemaFactory extends AbstractSchemaFactory {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseSchemaFactory.class);
 
   private final HBaseStoragePlugin plugin;
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index 946f6f5..46e9720 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.logical.StoragePluginConfigBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -34,7 +36,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 @JsonTypeName(HBaseStoragePluginConfig.NAME)
 public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
+  private static final Logger logger = LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
 
   private Map<String, String> config;
 
@@ -52,6 +54,8 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements
     if (config == null) {
       config = Maps.newHashMap();
     }
+    // TODO: Config-based information should reside in the
+    // storage plugin instance, not here.
     logger.debug("Initializing HBase StoragePlugin configuration with zookeeper quorum '{}', port '{}'.",
         config.get(HConstants.ZOOKEEPER_QUORUM), config.get(HBASE_ZOOKEEPER_PORT));
     if (sizeCalculatorEnabled == null) {
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 41aba6a..d4ea999 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -41,12 +41,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
- * Contains information for reading a single HBase region
+ * Information for reading a single HBase region
  */
-
 @JsonTypeName("hbase-region-scan")
 public class HBaseSubScan extends AbstractBase implements SubScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
 
   private final HBaseStoragePlugin hbaseStoragePlugin;
   private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -59,7 +57,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
     this(userName,
-        (HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig),
+        registry.resolve(hbaseStoragePluginConfig, HBaseStoragePlugin.class),
         regionScanSpecList,
         columns);
   }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index d5436eb..6e6d469 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -62,7 +62,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("filter") LogicalExpression filter,
                                             @JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
     this(userName,
-      (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
+      registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
       rowGroupReadEntries,
       columns,
       hivePartitionHolder,
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 8b89488..9aae78f 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -65,7 +65,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                                     @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
-    this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
+    this.hiveStoragePlugin = engineRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class);
     this.confProperties = confProperties;
 
     this.metadataProvider = new HiveParquetTableMetadataProvider(entries, hivePartitionHolder, hiveStoragePlugin, readerConfig);
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index c7d04a9..3fd1258 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -55,7 +57,7 @@ import static org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWith
 
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+  private static final Logger logger = LoggerFactory.getLogger(HiveScan.class);
 
   private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20;
 
@@ -78,7 +80,7 @@ public class HiveScan extends AbstractGroupScan {
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this(userName,
         hiveReadEntry,
-        (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
+        pluginRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
         columns,
         null, confProperties);
   }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index a8032cc..5efa505 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -73,7 +73,8 @@ public class HiveSubScan extends AbstractBase implements SubScan {
         hiveReadEntry,
         splitClasses,
         columns,
-        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties);
+        registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
+        confProperties);
   }
 
   public HiveSubScan(final String userName,
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
index f7dedf3..017c3ed 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
@@ -28,6 +28,7 @@ import java.util.Map;
  * This class is wrapper of {@link Table} class and used for
  * storage of such additional information as column lists cache.
  */
+@SuppressWarnings("serial")
 public class HiveTableWithColumnCache extends Table {
 
   private ColumnListsCache columnListsCache;
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index f80bd80..f7c7099 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -91,6 +91,8 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
@@ -104,7 +106,7 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 public class HiveUtilities {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
+  private static final Logger logger = LoggerFactory.getLogger(HiveUtilities.class);
 
   /**
    * Partition value is received in string format. Convert it into appropriate object based on the type.
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 31610db..36e32bd 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -47,11 +47,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
 
 public class HiveSchemaFactory extends AbstractSchemaFactory {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(HiveSchemaFactory.class);
 
   // MetaStoreClient created using process user credentials
   private final DrillHiveMetaStoreClient processUserMetastoreClient;
@@ -59,7 +61,6 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
   private final LoadingCache<String, DrillHiveMetaStoreClient> metaStoreClientLoadingCache;
 
   private final HiveStoragePlugin plugin;
-  private final HiveConf hiveConf;
   private final boolean isDrillImpersonationEnabled;
   private final boolean isHS2DoAsSet;
 
@@ -67,7 +68,6 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
     super(name);
     this.plugin = plugin;
 
-    this.hiveConf = hiveConf;
     isHS2DoAsSet = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS);
     isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
 
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index 05ce7ae..e8e60ad 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -75,5 +75,4 @@ public class HiveTestBase extends PlanTestBase {
       HIVE_TEST_FIXTURE.getPluginManager().removeHivePluginFrom(bits);
     }
   }
-
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
index bff25ac..ad8c31c 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
@@ -26,9 +26,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
 import org.apache.drill.test.BaseDirTestWatcher;
@@ -228,7 +228,7 @@ public class HiveTestFixture {
           pluginConfig.setEnabled(true);
           drillbit.getContext().getStorage().put(pluginName, pluginConfig);
         }
-      } catch (ExecutionSetupException e) {
+      } catch (PluginException e) {
         throw new RuntimeException("Failed to add Hive storage plugin to drillbits", e);
       }
     }
@@ -238,7 +238,13 @@ public class HiveTestFixture {
     }
 
     public void removeHivePluginFrom(Iterable<Drillbit> drillbits) {
-      drillbits.forEach(bit -> bit.getContext().getStorage().remove(pluginName));
+      try {
+        for (Drillbit drillbit : drillbits) {
+          drillbit.getContext().getStorage().remove(pluginName);
+        }
+      } catch (PluginException e) {
+        throw new RuntimeException("Failed to remove Hive storage plugin for drillbits", e);
+      }
     }
 
     public void updateHivePlugin(Iterable<Drillbit> drillbits,
@@ -254,14 +260,12 @@ public class HiveTestFixture {
           newPluginConfig.getConfigProps().putAll(configOverride);
           pluginRegistry.put(pluginName, newPluginConfig);
         }
-      } catch (ExecutionSetupException e) {
+      } catch (PluginException e) {
         throw new RuntimeException("Failed to update Hive storage plugin for drillbits", e);
       }
     }
-
   }
 
-
   /**
    * Implements method for initialization and passing
    * of Hive to consumer instances in order to be used
@@ -275,8 +279,7 @@ public class HiveTestFixture {
      *  {@link HiveTestFixture}'s constructor will create instance,
      *  and API users will get it via {@link HiveTestFixture#getDriverManager()}.
      */
-    private HiveDriverManager() {
-    }
+    private HiveDriverManager() { }
 
     public void runWithinSession(Consumer<Driver> driverConsumer) {
       final HiveConf hiveConf = new HiveConf(SessionState.class);
@@ -289,7 +292,5 @@ public class HiveTestFixture {
         throw new RuntimeException("Exception was thrown while closing SessionState", e);
       }
     }
-
   }
-
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
index 199d922..4fcd14d 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -53,7 +53,7 @@ public class JdbcGroupScan extends AbstractGroupScan {
     super("");
     this.sql = sql;
     this.columns = columns;
-    this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+    this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
     this.rows = rows;
   }
 
@@ -92,6 +92,7 @@ public class JdbcGroupScan extends AbstractGroupScan {
     return sql;
   }
 
+  @Override
   public List<SchemaPath> getColumns() {
     return columns;
   }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index 43bf909..9991f6f 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -48,7 +48,7 @@ public class JdbcSubScan extends AbstractSubScan {
     super("");
     this.sql = sql;
     this.columns = columns;
-    this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+    this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
   }
 
   JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index e4f255c..58c2f9f 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -64,7 +64,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 @JsonTypeName("kafka-scan")
 public class KafkaGroupScan extends AbstractGroupScan {
-
   private static final Logger logger = LoggerFactory.getLogger(KafkaGroupScan.class);
 
   // Assuming default average topic message size as 1KB, which will be used to
@@ -87,7 +86,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
                         @JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this(userName,
-        (KafkaStoragePlugin) pluginRegistry.getPlugin(kafkaStoragePluginConfig),
+        pluginRegistry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
         columns,
         scanSpec);
   }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 6ea9e1d..9341aa6 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -53,7 +53,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
       throws ExecutionSetupException {
     this(userName,
-        (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
+        registry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
         columns,
         partitionSubScanSpecList);
   }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
index de95c49..4ed2394 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
@@ -30,7 +30,6 @@ import org.apache.kudu.Type;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class DrillKuduTable extends DynamicDrillTable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillKuduTable.class);
 
   private final Schema schema;
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index 593ca12..b66d684 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -54,7 +54,6 @@ import org.apache.kudu.client.LocatedTablet.Replica;
 
 @JsonTypeName("kudu-scan")
 public class KuduGroupScan extends AbstractGroupScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
   private static final long DEFAULT_TABLET_SIZE = 1000;
 
   private KuduStoragePlugin kuduStoragePlugin;
@@ -72,7 +71,7 @@ public class KuduGroupScan extends AbstractGroupScan {
                         @JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this((KuduStoragePlugin) pluginRegistry.getPlugin(kuduStoragePluginConfig), kuduScanSpec, columns);
+    this(pluginRegistry.resolve(kuduStoragePluginConfig, KuduStoragePlugin.class), kuduScanSpec, columns);
   }
 
   public KuduGroupScan(KuduStoragePlugin kuduStoragePlugin,
@@ -112,9 +111,9 @@ public class KuduGroupScan extends AbstractGroupScan {
 
   private static class KuduWork implements CompleteWork {
 
-    private EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
-    private byte[] partitionKeyStart;
-    private byte[] partitionKeyEnd;
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+    private final byte[] partitionKeyStart;
+    private final byte[] partitionKeyEnd;
 
     public KuduWork(byte[] partitionKeyStart, byte[] partitionKeyEnd) {
       this.partitionKeyStart = partitionKeyStart;
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 976b16d..abe6f0d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -61,12 +61,13 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.client.shaded.com.google.common.collect.ImmutableMap;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class KuduRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(KuduRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
index 9b98ccd..27f5bd5 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
@@ -34,6 +34,8 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +43,7 @@ import java.util.List;
 import java.util.Map;
 
 public class KuduRecordWriterImpl extends KuduRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordWriterImpl.class);
+  private static final Logger logger = LoggerFactory.getLogger(KuduRecordWriterImpl.class);
 
   private static final int FLUSH_FREQUENCY = 100;
 
@@ -49,7 +51,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
   private final String name;
   private final OperatorContext context;
   private KuduTable table;
-  private KuduSession session;
+  private final KuduSession session;
 
   private Insert insert;
   private int recordsSinceFlush;
@@ -63,9 +65,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
   }
 
   @Override
-  public void init(Map<String, String> writerOptions) throws IOException {
-
-  }
+  public void init(Map<String, String> writerOptions) throws IOException { }
 
   @Override
   public void updateSchema(VectorAccessible batch) throws IOException {
@@ -149,8 +149,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
   }
 
   @Override
-  public void abort() throws IOException {
-  }
+  public void abort() throws IOException { }
 
   private void flush() throws IOException {
     try {
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index a192a05..f2a95c6 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class);
 
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan subScan, List<RecordBatch> children)
@@ -53,5 +52,4 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
     }
     return new ScanBatch(subScan, context, readers);
   }
-
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
index 2e92369..371cf2b 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
@@ -33,6 +33,4 @@ public class KuduScanSpec {
   public String getTableName() {
     return tableName;
   }
-
-
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
index 183849d..a4d7d3b 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -34,11 +34,12 @@ import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.ListTablesResponse;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class KuduSchemaFactory extends AbstractSchemaFactory {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
+  private static final Logger logger = LoggerFactory.getLogger(KuduSchemaFactory.class);
 
   private final KuduStoragePlugin plugin;
 
@@ -84,7 +85,6 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
         logger.warn("Failure while retrieving kudu table {}", name, e);
         return null;
       }
-
     }
 
     @Override
@@ -111,7 +111,6 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
         public List<String> getPartitionColumns() {
           return Collections.emptyList();
         }
-
       };
     }
 
@@ -136,7 +135,5 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
     public String getTypeName() {
       return KuduStoragePluginConfig.NAME;
     }
-
   }
-
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
index b1ae708..35c974d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class KuduStoragePlugin extends AbstractStoragePlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePlugin.class);
 
   private final KuduStoragePluginConfig engineConfig;
   private final KuduSchemaFactory schemaFactory;
@@ -45,11 +44,6 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
     this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build();
   }
 
-  @Override
-  public void start() throws IOException {
-
-  }
-
   public KuduClient getClient() {
     return client;
   }
@@ -84,5 +78,4 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
   public KuduStoragePluginConfig getConfig() {
     return engineConfig;
   }
-
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
index e3bab87..586d38a 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(KuduStoragePluginConfig.NAME)
 public class KuduStoragePluginConfig extends StoragePluginConfigBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePluginConfig.class);
 
   public static final String NAME = "kudu";
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
index 3d35371..22e13a6 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -38,10 +38,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
-// Class containing information for reading a single Kudu tablet
+/**
+ * Information for reading a single Kudu tablet
+ */
 @JsonTypeName("kudu-sub-scan")
 public class KuduSubScan extends AbstractBase implements SubScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class);
 
   private final KuduStoragePlugin kuduStoragePlugin;
   private final List<KuduSubScanSpec> tabletScanSpecList;
@@ -53,7 +54,7 @@ public class KuduSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> tabletScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
     super((String) null);
-    kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(kuduStoragePluginConfig);
+    kuduStoragePlugin = registry.resolve(kuduStoragePluginConfig, KuduStoragePlugin.class);
     this.tabletScanSpecList = tabletScanSpecList;
     this.columns = columns;
   }
@@ -118,23 +119,15 @@ public class KuduSubScan extends AbstractBase implements SubScan {
       this.endKey = endKey;
     }
 
-    public String getTableName() {
-      return tableName;
-    }
-
-    public byte[] getStartKey() {
-      return startKey;
-    }
+    public String getTableName() { return tableName; }
 
-    public byte[] getEndKey() {
-      return endKey;
-    }
+    public byte[] getStartKey() { return startKey; }
 
+    public byte[] getEndKey() { return endKey; }
   }
 
   @Override
   public int getOperatorType() {
     return CoreOperatorType.KUDU_SUB_SCAN_VALUE;
   }
-
 }
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index 7611576..95f286e 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -32,7 +32,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class KuduWriter extends AbstractWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriter.class);
 
   private final KuduStoragePlugin plugin;
   private final String name;
@@ -44,7 +43,7 @@ public class KuduWriter extends AbstractWriter {
       @JsonProperty("storage") StoragePluginConfig storageConfig,
       @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
     super(child);
-    this.plugin = (KuduStoragePlugin) engineRegistry.getPlugin(storageConfig);
+    this.plugin = engineRegistry.resolve(storageConfig, KuduStoragePlugin.class);
     this.name = name;
   }
 
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
index 22e86ba..643a136 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class KuduWriterBatchCreator implements BatchCreator<KuduWriter> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class);
 
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KuduWriter config, List<RecordBatch> children)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 8bdf2e0..10720e4 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class MongoFilterBuilder extends
     AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
     DrillMongoConstants {
-  static final Logger logger = LoggerFactory
+  private static final Logger logger = LoggerFactory
       .getLogger(MongoFilterBuilder.class);
   final MongoGroupScan groupScan;
   final LogicalExpression le;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 46ea567..aedbc82 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -81,7 +81,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private static final Integer select = Integer.valueOf(1);
 
-  static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+  private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
 
   private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
     @Override
@@ -110,7 +110,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   private Map<String, List<ChunkInfo>> chunksInverseMapping;
 
-  private Stopwatch watch = Stopwatch.createUnstarted();
+  private final Stopwatch watch = Stopwatch.createUnstarted();
 
   private boolean filterPushedDown = false;
 
@@ -122,7 +122,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
       @JsonProperty("columns") List<SchemaPath> columns,
       @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
       ExecutionSetupException {
-    this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+    this(userName,
+        pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
         scanSpec, columns);
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index b065aa4..03acd05 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -31,15 +31,13 @@ import com.mongodb.MongoCredential;
 
 @JsonTypeName(MongoStoragePluginConfig.NAME)
 public class MongoStoragePluginConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MongoStoragePluginConfig.class);
 
   public static final String NAME = "mongo";
 
-  private String connection;
+  private final String connection;
 
   @JsonIgnore
-  private MongoClientURI clientURI;
+  private final MongoClientURI clientURI;
 
   @JsonCreator
   public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 32f5e75..c85dc61 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -66,8 +66,8 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     super(userName);
     this.columns = columns;
     this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
-    this.mongoStoragePlugin = (MongoStoragePlugin) registry
-        .getPlugin(mongoPluginConfig);
+    this.mongoStoragePlugin = registry.resolve(
+        mongoPluginConfig, MongoStoragePlugin.class);
     this.chunkScanSpecList = chunkScanSpecList;
   }
 
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
index c812ff5..ea113c6 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
@@ -21,12 +21,12 @@ public interface Constants {
   /**
    * openTSDB required constants for API call
    */
-  public static final String DEFAULT_TIME = "47y-ago";
-  public static final String SUM_AGGREGATOR = "sum";
+  String DEFAULT_TIME = "47y-ago";
+  String SUM_AGGREGATOR = "sum";
 
-  public static final String TIME_PARAM = "start";
-  public static final String END_TIME_PARAM = "end";
-  public static final String METRIC_PARAM = "metric";
-  public static final String AGGREGATOR_PARAM = "aggregator";
-  public static final String DOWNSAMPLE_PARAM = "downsample";
+  String TIME_PARAM = "start";
+  String END_TIME_PARAM = "end";
+  String METRIC_PARAM = "metric";
+  String AGGREGATOR_PARAM = "aggregator";
+  String DOWNSAMPLE_PARAM = "downsample";
 }
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
index 8a09637..6800903 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
@@ -47,9 +47,9 @@ import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
 @JsonTypeName("openTSDB-scan")
 public class OpenTSDBGroupScan extends AbstractGroupScan {
 
-  private OpenTSDBStoragePluginConfig storagePluginConfig;
-  private OpenTSDBScanSpec openTSDBScanSpec;
-  private OpenTSDBStoragePlugin storagePlugin;
+  private final OpenTSDBStoragePluginConfig storagePluginConfig;
+  private final OpenTSDBScanSpec openTSDBScanSpec;
+  private final OpenTSDBStoragePlugin storagePlugin;
 
   private List<SchemaPath> columns;
 
@@ -58,7 +58,8 @@ public class OpenTSDBGroupScan extends AbstractGroupScan {
                            @JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig,
                            @JsonProperty("columns") List<SchemaPath> columns,
                            @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this((OpenTSDBStoragePlugin) pluginRegistry.getPlugin(openTSDBStoragePluginConfig), openTSDBScanSpec, columns);
+    this(pluginRegistry.resolve(openTSDBStoragePluginConfig, OpenTSDBStoragePlugin.class),
+        openTSDBScanSpec, columns);
   }
 
   public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin,
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
index ec04790..01e418b 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
@@ -31,8 +31,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -42,9 +40,6 @@ import java.util.List;
 @JsonTypeName("openTSDB-sub-scan")
 public class OpenTSDBSubScan extends AbstractBase implements SubScan {
 
-  private static final Logger log =
-      LoggerFactory.getLogger(OpenTSDBSubScan.class);
-
   public final OpenTSDBStoragePluginConfig storage;
 
   private final List<SchemaPath> columns;
@@ -57,7 +52,7 @@ public class OpenTSDBSubScan extends AbstractBase implements SubScan {
                          @JsonProperty("tabletScanSpecList") LinkedList<OpenTSDBSubScanSpec> tabletScanSpecList,
                          @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
     super((String) null);
-    openTSDBStoragePlugin = (OpenTSDBStoragePlugin) registry.getPlugin(storage);
+    openTSDBStoragePlugin = registry.resolve(storage, OpenTSDBStoragePlugin.class);
     this.tabletScanSpecList = tabletScanSpecList;
     this.storage = storage;
     this.columns = columns;
diff --git a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
index 98146f3..78421ab 100644
--- a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -24,13 +24,13 @@ import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.impl.AbstractSchema;
 import org.apache.calcite.util.BuiltInMethod;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserExceptionUtils;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.SubSchemaWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ public class DynamicRootSchema extends DynamicSchema {
           schemaPlus.add(wrapper.getName(), wrapper);
         }
       }
-    } catch(ExecutionSetupException | IOException ex) {
+    } catch(PluginException | IOException ex) {
       logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
       // We can't proceed further without a schema, throw a runtime exception.
       UserException.Builder exceptBuilder =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 2551b6d..dd3c541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.opt;
 
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -55,6 +54,7 @@ import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 
@@ -202,10 +202,10 @@ public class BasicOptimizer extends Optimizer {
                 scan.getStorageEngine()));
       }
       try {
-        final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
+        final StoragePlugin storagePlugin = queryContext.getStorage().getPluginByConfig(config);
         final String user = userSession.getSession().getCredentials().getUserName();
         return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions());
-      } catch (IOException | ExecutionSetupException e) {
+      } catch (IOException | PluginException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 9e82ffc..d355798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -71,7 +71,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
 
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
-    throw new UnsupportedOperationException(String.format("%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(String.format(
+        "%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 23ea23f..70244bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -43,9 +43,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class FileSystemCreateTableEntry implements CreateTableEntry {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemCreateTableEntry.class);
 
-  private FileSystemConfig storageConfig;
-  private FormatPlugin formatPlugin;
-  private String location;
+  private final FileSystemConfig storageConfig;
+  private final FormatPlugin formatPlugin;
+  private final String location;
   private final List<String> partitionColumns;
   private final StorageStrategy storageStrategy;
 
@@ -58,7 +58,7 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
                                     @JacksonInject StoragePluginRegistry engineRegistry)
       throws ExecutionSetupException {
     this.storageConfig = storageConfig;
-    this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, FormatPlugin.class);
     this.location = location;
     this.partitionColumns = partitionColumns;
     this.storageStrategy = storageStrategy;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
index 92a07c5..24ebf561 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
@@ -23,18 +23,20 @@ import com.fasterxml.jackson.core.io.CharacterEscapes;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlDescribeSchema;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -45,13 +47,11 @@ import java.util.Map;
 import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
 
 public class DescribeSchemaHandler extends DefaultSqlHandler {
+  private static final Logger logger = LoggerFactory.getLogger(DescribeSchemaHandler.class);
 
-  public DescribeSchemaHandler(SqlHandlerConfig config) {
-    super(config);
-  }
-
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DescribeSchemaHandler.class);
-  private static final ObjectMapper mapper = new ObjectMapper(new ObjectMapper().getFactory().setCharacterEscapes(new CharacterEscapes() {
+  @SuppressWarnings("serial")
+  private static final ObjectMapper mapper = new ObjectMapper(
+      new ObjectMapper().getFactory().setCharacterEscapes(new CharacterEscapes() {
     @Override
     public int[] getEscapeCodesForAscii() {
       // add standard set of escaping characters
@@ -69,6 +69,10 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
   })).enable(INDENT_OUTPUT);
 
 
+  public DescribeSchemaHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
   @Override
   public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
     SqlIdentifier schema = unwrap(sqlNode, SqlDescribeSchema.class).getSchema();
@@ -88,7 +92,7 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
         throw new DrillRuntimeException(String.format("Unable to find storage plugin with the following name [%s].",
           drillSchema.getSchemaPath().get(0)));
       }
-    } catch (ExecutionSetupException e) {
+    } catch (PluginException e) {
       throw new DrillRuntimeException("Failure while retrieving storage plugin", e);
     }
 
@@ -131,5 +135,4 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
       this.properties = properties;
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 43d2f2a..9773f37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.planner.sql.handlers;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
@@ -45,7 +47,7 @@ import java.util.HashSet;
 import java.util.List;
 
 public class SqlHandlerUtil {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlHandlerUtil.class);
+  private static final Logger logger = LoggerFactory.getLogger(SqlHandlerUtil.class);
 
   /**
    * Resolve final RelNode of the new table (or view) for given table field list and new table definition.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 344db1d..141c027 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -57,6 +57,8 @@ import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
 import org.glassfish.jersey.server.mvc.freemarker.FreemarkerMvcFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.servlet.ServletContext;
@@ -72,7 +74,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class DrillRestServer extends ResourceConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class);
+  static final Logger logger = LoggerFactory.getLogger(DrillRestServer.class);
 
   public DrillRestServer(final WorkManager workManager, final ServletContext servletContext, final Drillbit drillbit) {
     register(DrillRoot.class);
@@ -121,12 +123,13 @@ public class DrillRestServer extends ResourceConfig {
     register(new AbstractBinder() {
       @Override
       protected void configure() {
+        DrillbitContext context = workManager.getContext();
         bind(drillbit).to(Drillbit.class);
         bind(workManager).to(WorkManager.class);
         bind(executor).to(EventExecutor.class);
-        bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
-        bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class);
-        bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class);
+        bind(context.getLpPersistence().getMapper()).to(ObjectMapper.class);
+        bind(context.getStoreProvider()).to(PersistentStoreProvider.class);
+        bind(context.getStorage()).to(StoragePluginRegistry.class);
         bind(new UserAuthEnabled(isAuthEnabled)).to(UserAuthEnabled.class);
         if (isAuthEnabled) {
           bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
@@ -356,7 +359,7 @@ public class DrillRestServer extends ResourceConfig {
 
   // Returns whether auth is enabled or not in config
   public static class UserAuthEnabled {
-    private boolean value;
+    private final boolean value;
 
     public UserAuthEnabled(boolean value) {
       this.value = value;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
index c19fcc9..30c46b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.server.rest;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,48 +31,23 @@ public class PluginConfigWrapper {
 
   private final String name;
   private final StoragePluginConfig config;
-  private final boolean exists;
 
   @JsonCreator
-  public PluginConfigWrapper(@JsonProperty("name") String name, @JsonProperty("config") StoragePluginConfig config) {
+  public PluginConfigWrapper(@JsonProperty("name") String name,
+      @JsonProperty("config") StoragePluginConfig config) {
     this.name = name;
     this.config = config;
-    this.exists = config != null;
   }
 
-  public String getName() {
-    return name;
-  }
+  public String getName() { return name; }
 
-  public StoragePluginConfig getConfig() {
-    return config;
-  }
+  public StoragePluginConfig getConfig() { return config; }
 
   public boolean enabled() {
-    return exists && config.isEnabled();
-  }
-
-  public void createOrUpdateInStorage(StoragePluginRegistry storage) throws ExecutionSetupException {
-    storage.put(name, config);
-  }
-
-  public boolean setEnabledInStorage(StoragePluginRegistry storage, boolean enabled) throws ExecutionSetupException {
-    if (exists) {
-      config.setEnabled(enabled);
-      createOrUpdateInStorage(storage);
-    }
-    return exists;
-  }
-
-  public boolean exists() {
-    return exists;
+    return config.isEnabled();
   }
 
-  public boolean deleteFromStorage(StoragePluginRegistry storage) {
-    if (exists) {
-      storage.remove(name);
-      return true;
-    }
-    return false;
+  public void createOrUpdateInStorage(StoragePluginRegistry storage) throws PluginException {
+    storage.validatedPut(name, config);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 35e164f..1ed9128 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -17,14 +17,11 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import java.io.IOException;
-import java.io.StringReader;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Spliterator;
 import java.util.Spliterators;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -45,22 +42,22 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
-import com.fasterxml.jackson.core.JsonParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginEncodingException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginNotFoundException;
 import org.glassfish.jersey.server.mvc.Viewable;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.ADMIN_ROLE;
 
+// Serialization of plugins to JSON is handled by the Jetty framework
+// as configured in DrillRestServer.
 @Path("/")
 @RolesAllowed(ADMIN_ROLE)
 public class StorageResources {
@@ -73,9 +70,6 @@ public class StorageResources {
   StoragePluginRegistry storage;
 
   @Inject
-  ObjectMapper mapper;
-
-  @Inject
   SecurityContext sc;
 
   @Inject
@@ -91,11 +85,16 @@ public class StorageResources {
       Comparator.comparing(PluginConfigWrapper::getName);
 
   /**
-   * Regex allows the following paths:
+   * Regex allows the following paths:<pre><code>
    * /storage/{group}/plugins/export
-   * /storage/{group}/plugins/export/{format}
-   * Note: for the second case the format involves the leading slash, therefore it should be removed then
+   * /storage/{group}/plugins/export/{format}</code></pre>
+   * <p>
+   * Note: for the second case the format involves the leading slash,
+   * therefore it should be removed then
    */
+  // This code has a flaw: the Jackson ObjectMapper cannot serialize to
+  // HOCON format, though it can read HOCON. Thus, the only valid format
+  // is json.
   @GET
   @Path("/storage/{group}/plugins/export{format: (/[^/]+?)*}")
   @Produces(MediaType.APPLICATION_JSON)
@@ -139,7 +138,7 @@ public class StorageResources {
   @Produces(MediaType.APPLICATION_JSON)
   public PluginConfigWrapper getPluginConfig(@PathParam("name") String name) {
     try {
-      return new PluginConfigWrapper(name, storage.getConfig(name));
+      return new PluginConfigWrapper(name, storage.getStoredConfig(name));
     } catch (Exception e) {
       logger.error("Failure while trying to access storage config: {}", name, e);
     }
@@ -159,14 +158,14 @@ public class StorageResources {
   @Path("/storage/{name}/enable/{val}")
   @Produces(MediaType.APPLICATION_JSON)
   public JsonResult enablePlugin(@PathParam("name") String name, @PathParam("val") Boolean enable) {
-    PluginConfigWrapper plugin = getPluginConfig(name);
     try {
-      return plugin.setEnabledInStorage(storage, enable)
-          ? message("Success")
-          : message("Error (plugin does not exist)");
-    } catch (ExecutionSetupException e) {
+      storage.setEnabled(name, enable);
+      return message("Success");
+    } catch (PluginNotFoundException e) {
+      return message("No plugin exists with the given name: " + name);
+    } catch (PluginException e) {
       logger.debug("Error in enabling storage name: {} flag: {}",  name, enable);
-      return message("Error (unable to enable / disable storage)");
+      return message("Unable to enable/disable plugin:" + e.getMessage());
     }
   }
 
@@ -194,9 +193,12 @@ public class StorageResources {
   @Path("/storage/{name}.json")
   @Produces(MediaType.APPLICATION_JSON)
   public JsonResult deletePlugin(@PathParam("name") String name) {
-    return getPluginConfig(name).deleteFromStorage(storage)
-        ? message("Success")
-        : message("Error (unable to delete %s storage plugin)", name);
+    try {
+      storage.remove(name);
+      return message("Success");
+    } catch (PluginException e) {
+      return message(e.getMessage());
+    }
   }
 
   @POST
@@ -207,13 +209,17 @@ public class StorageResources {
     try {
       plugin.createOrUpdateInStorage(storage);
       return message("Success");
-    } catch (ExecutionSetupException e) {
+    } catch (PluginException e) {
       logger.error("Unable to create/ update plugin: " + plugin.getName(), e);
-      return message("Error while creating / updating storage : %s", e.getCause() == null ? e.getMessage() :
-          e.getCause().getMessage());
+      return message("Error while saving plugin: %s", e.getMessage());
     }
   }
 
+  // Allows JSON that includes comments. However, since the JSON is immediately
+  // serialized, the comments are lost. Would be better to validate the JSON,
+  // then write the original JSON directly to the persistent store, and read
+  // JSON from the store, rather than letting Jackson produce it. That way,
+  // comments are preserved.
   @POST
   @Path("/storage/create_update")
   @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@@ -224,18 +230,13 @@ public class StorageResources {
       return message("Error (a storage name cannot be empty)");
     }
     try {
-      mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
-      StoragePluginConfig config = mapper.readValue(new StringReader(storagePluginConfig), StoragePluginConfig.class);
-      return createOrUpdatePluginJSON(new PluginConfigWrapper(name, config));
-    } catch (JsonMappingException e) {
+      storage.putJson(name, storagePluginConfig);
+      return message("Success");
+    } catch (PluginEncodingException e) {
       logger.debug("Error in JSON mapping: {}", storagePluginConfig, e);
-      return message("Error (invalid JSON mapping)");
-    } catch (JsonParseException e) {
-      logger.debug("Error parsing JSON: {}", storagePluginConfig, e);
-      return message("Error (unable to parse JSON)");
-    } catch (IOException e) {
-      logger.debug("Failed to read: {}", storagePluginConfig, e);
-      return message("Error (unable to read)");
+      return message("Invalid JSON");
+    } catch (PluginException e) {
+      return message(e.getMessage());
     }
   }
 
@@ -248,19 +249,41 @@ public class StorageResources {
   }
 
   /**
-   * Regex allows the following paths:
+   * Regex allows the following paths:<pre><code>
    * /storage.json
-   * /storage/{group}-plugins.json
-   * Note: for the second case the group involves the leading slash, therefore it should be removed then
+   * /storage/{group}-plugins.json</code></pre>
+   * Allowable groups:
+   * <ul>
+   * <li>"all" {@link #ALL_PLUGINS}</li>
+   * <li>"enabled" {@link #ENABLED_PLUGINS}</li>
+   * <li>"disabled" {@link #DISABLED_PLUGINS}</li>
+   * </ul>
+   * Any other group value results in an empty list.
+   * <p>
+   * Note: for the second case the group involves the leading slash,
+   * therefore it should be removed then
    */
   @GET
   @Path("/storage{group: (/[^/]+?)*}-plugins.json")
   @Produces(MediaType.APPLICATION_JSON)
   public List<PluginConfigWrapper> getConfigsFor(@PathParam("group") String pluginGroup) {
+    PluginFilter filter;
+    switch (pluginGroup.trim()) {
+    case ALL_PLUGINS:
+      filter = PluginFilter.ALL;
+      break;
+    case ENABLED_PLUGINS:
+      filter = PluginFilter.ENABLED;
+      break;
+    case DISABLED_PLUGINS:
+      filter = PluginFilter.DISABLED;
+      break;
+    default:
+      return Collections.emptyList();
+    }
     pluginGroup = StringUtils.isNotEmpty(pluginGroup) ? pluginGroup.replace("/", "") : ALL_PLUGINS;
     return StreamSupport.stream(
-        Spliterators.spliteratorUnknownSize(storage.storedConfigs().entrySet().iterator(), Spliterator.ORDERED), false)
-            .filter(byPluginGroup(pluginGroup))
+        Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
             .map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue()))
             .sorted(PLUGIN_COMPARATOR)
             .collect(Collectors.toList());
@@ -286,21 +309,7 @@ public class StorageResources {
   @Produces(MediaType.APPLICATION_JSON)
   @Deprecated
   public JsonResult deletePluginViaGet(@PathParam("name") String name) {
-    return getPluginConfig(name).deleteFromStorage(storage)
-        ? message("Success")
-        : message("Error (unable to delete %s storage plugin)", name);
-  }
-
-  private Predicate<Map.Entry<String, StoragePluginConfig>> byPluginGroup(String pluginGroup) {
-    if (ALL_PLUGINS.equalsIgnoreCase(pluginGroup)) {
-      return entry -> true;
-    } else if (ENABLED_PLUGINS.equalsIgnoreCase(pluginGroup)) {
-      return entry -> entry.getValue().isEnabled();
-    } else if (DISABLED_PLUGINS.equalsIgnoreCase(pluginGroup)) {
-      return entry -> !entry.getValue().isEnabled();
-    } else {
-      return entry -> false;
-    }
+    return deletePlugin(name);
   }
 
   @XmlRootElement
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
index 7f18983..0b2694b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
@@ -280,7 +280,8 @@ public class ClassicConnectorLocator implements ConnectorLocator {
         throw ((ExecutionSetupException) t);
       }
       throw new ExecutionSetupException(String.format(
-          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+          "Failure setting up new storage plugin configuration for config %s",
+          pluginConfig.getClass().getSimpleName()), t);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
index 5a5187a..fc75f82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
@@ -33,16 +33,19 @@ public class DrillbitPluginRegistryContext implements PluginRegistryContext {
 
   private final DrillbitContext drillbitContext;
   private final ObjectMapper mapper;
+  private final ObjectMapper hoconMapper;
 
   public DrillbitPluginRegistryContext(DrillbitContext drillbitContext) {
     this.drillbitContext = drillbitContext;
 
+    mapper = drillbitContext.getLpPersistence().getMapper();
+
     // Specialized form of the persistence mechanism
     // to handle HOCON format in the override file
     LogicalPlanPersistence persistence = new LogicalPlanPersistence(drillbitContext.getConfig(),
         drillbitContext.getClasspathScan(),
         new ObjectMapper(new HoconFactory()));
-    mapper = persistence.getMapper();
+    hoconMapper = persistence.getMapper();
   }
 
   @Override
@@ -56,6 +59,11 @@ public class DrillbitPluginRegistryContext implements PluginRegistryContext {
   }
 
   @Override
+  public ObjectMapper hoconMapper() {
+    return hoconMapper;
+  }
+
+  @Override
   public ScanResult classpathScan() {
     return drillbitContext.getClasspathScan();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
index aebe45e..8f27869 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
@@ -227,7 +227,7 @@ public class PluginBootstrapLoaderImpl implements PluginBootstrapLoader {
     }
     pluginsOverrideFileUrl = urlSet.iterator().next();
     try (InputStream is = pluginsOverrideFileUrl.openStream();) {
-      return context.mapper().readValue(is, StoragePlugins.class);
+      return context.hoconMapper().readValue(is, StoragePlugins.class);
     } catch (IOException e) {
       logger.error("Failures are obtained while loading file: '{}'. Proceeding without update.",
           upgradeFileName, e);
@@ -332,6 +332,6 @@ public class PluginBootstrapLoaderImpl implements PluginBootstrapLoader {
 
   private StoragePlugins getPluginsFromResource(URL resource) throws IOException {
     String pluginsData = Resources.toString(resource, Charsets.UTF_8);
-    return context.mapper().readValue(pluginsData, StoragePlugins.class);
+    return context.hoconMapper().readValue(pluginsData, StoragePlugins.class);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
index 4a80e66..9b0d0ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
@@ -141,6 +141,8 @@ public class PluginHandle {
     logger.info("Creating storage plugin for {}", name);
     try {
       plugin = connector.newInstance(name, config);
+    } catch (UserException e) {
+      throw e;
     } catch (Exception e) {
       throw UserException.internalError(e)
         .addContext("Plugin name", name)
@@ -189,6 +191,9 @@ public class PluginHandle {
    * from/to ephemeral storage and the plugin cache, since those two
    * caches are not synchronized as a whole. Ensures that only one of the
    * threads in a race condition will transfer the actual plugin instance.
+   * <p>
+   * By definition, a plugin becomes disabled if it moves to ephemeral,
+   * enabled if it moves from ephemeral into stored status.
    */
   public synchronized PluginHandle transfer(PluginType type) {
     if (plugin == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
index 3c94bf3..b249b66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public interface PluginRegistryContext {
   DrillConfig config();
   ObjectMapper mapper();
+  ObjectMapper hoconMapper();
   ScanResult classpathScan();
 
   // TODO: Remove this here and from StoragePlugin constructors.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index 65b1e96..9968a0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -23,12 +23,10 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Holds maps to storage plugins. Supports name => plugin and config => plugin
@@ -56,7 +54,6 @@ import org.slf4j.LoggerFactory;
  * close. (The one exception is final close, which is done here.)
  */
 class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
-  private static final Logger logger = LoggerFactory.getLogger(StoragePluginMap.class);
 
   private final Map<String, PluginHandle> nameMap = CaseInsensitiveMap.newHashMap();
   private final Map<StoragePluginConfig, PluginHandle> configMap = new HashMap<>();
@@ -66,8 +63,9 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
    * the same plugin twice. Also safe for putting a different
    *
    * @return the replaced entry, if any, which the caller should close
+   * @throws PluginException for an attempt to replace a system plugin
    */
-  public synchronized PluginHandle put(PluginHandle plugin) {
+  public synchronized PluginHandle put(PluginHandle plugin) throws PluginException {
     PluginHandle oldPlugin = nameMap.put(plugin.name(), plugin);
     if (oldPlugin != null) {
       if (oldPlugin == plugin || oldPlugin.config().equals(plugin.config())) {
@@ -76,12 +74,7 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
       if (oldPlugin.isIntrinsic()) {
         // Put the old one back
         nameMap.put(oldPlugin.name(), oldPlugin);
-        throw UserException.permissionError()
-          .message("Attempt to replace a system plugin.")
-          .addContext("Plugin name", oldPlugin.name())
-          .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
-          .addContext("Attempted replacement", plugin.config().getClass().getName())
-          .build(logger);
+        throw PluginException.systemPluginException("replace", plugin.name());
       }
       configMap.remove(oldPlugin.config());
     }
@@ -127,19 +120,16 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
    * @param newPlugin the new plugin to insert
    * @return true if the new plugin was inserted, false if not because
    * the old plugin was not found in the map
+   * @throws PluginException for an attempt to replace a system plugin
    */
-  public synchronized boolean replace(PluginHandle oldPlugin, PluginHandle newPlugin) {
+  public synchronized boolean replace(PluginHandle oldPlugin, PluginHandle newPlugin)
+      throws PluginException {
     Preconditions.checkArgument(oldPlugin != null);
     Preconditions.checkArgument(newPlugin != null);
     Preconditions.checkArgument(oldPlugin.name().equalsIgnoreCase(newPlugin.name()));
     Preconditions.checkArgument(oldPlugin != newPlugin);
     if (oldPlugin.isIntrinsic()) {
-      throw UserException.permissionError()
-        .message("Attempt to replace a system plugin.")
-        .addContext("Plugin name", oldPlugin.name())
-        .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
-        .addContext("Attempted replacement", newPlugin.config().getClass().getName())
-        .build(logger);
+      throw PluginException.systemPluginException("replace", oldPlugin.name());
     }
     boolean ok = nameMap.replace(oldPlugin.name(), oldPlugin, newPlugin);
     if (ok) {
@@ -157,19 +147,16 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
    *
    * @return the doomed plugin if the plugin was removed, null if there was
    * no entry by the given name
+   * @throws PluginException for an attempt to remove a system plugin
    * @see {@link #remove(PluginHandle)
    */
-  public synchronized PluginHandle remove(String name) {
+  public synchronized PluginHandle remove(String name) throws PluginException {
     PluginHandle plugin = get(name);
     if (plugin == null) {
       return null;
     }
     if (plugin.isIntrinsic()) {
-      throw UserException.permissionError()
-        .message("Attempt to remove a system plugin.")
-        .addContext("Plugin name", plugin.name())
-        .addContext("Intrinsic plugin class", plugin.config().getClass().getName())
-        .build(logger);
+      throw PluginException.systemPluginException("remove", name);
     }
     nameMap.remove(name);
     configMap.remove(plugin.config(), plugin);
@@ -177,29 +164,6 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
   }
 
   /**
-   * Removes the plugin, but only if it is in the map. That is,
-   * resolves the name and removes the plugin only if it resolves
-   * to the given plugin.
-   *
-   * @return true if the plugin was removed and the caller
-   * should close it, false otherwise
-   */
-  public synchronized boolean remove(PluginHandle oldPlugin) {
-    if (oldPlugin.isIntrinsic()) {
-      throw UserException.permissionError()
-        .message("Attempt to remove a system plugin.")
-        .addContext("Plugin name", oldPlugin.name())
-        .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
-        .build(logger);
-    }
-    boolean ok = nameMap.remove(oldPlugin.name(), oldPlugin);
-    if (ok) {
-      configMap.remove(oldPlugin.config(), oldPlugin);
-    }
-    return ok;
-  }
-
-  /**
    * Given a name and a config (which is presumed to have become disabled),
    * remove and return any existing plugin. Only matches if the name is found and the
    * named plugin has the same config as the one to remove to enforce
@@ -208,21 +172,23 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
    * @param name plugin name
    * @param oldConfig expected config of the doomed plugin
    * @return true if the plugin was removed and closed, false otherwise
+   * @throws PluginException for an attempt to remove a system plugin
    */
-  public synchronized PluginHandle remove(String name, StoragePluginConfig oldConfig) {
+  public synchronized PluginHandle remove(String name, StoragePluginConfig oldConfig) throws PluginException {
     PluginHandle oldEntry = nameMap.get(name);
     if (oldEntry == null || !oldEntry.config().equals(oldConfig)) {
       return null;
     }
     if (oldEntry.isIntrinsic()) {
-      throw UserException.permissionError()
-        .message("Attempt to remove a system plugin.")
-        .addContext("Plugin name", oldEntry.name())
-        .addContext("Intrinsic plugin class", oldEntry.config().getClass().getName())
-        .build(logger);
+      throw PluginException.systemPluginException("remove", name);
     }
     nameMap.remove(oldEntry.name());
-    configMap.remove(oldEntry.config());
+    if (configMap.remove(oldEntry.config()) != oldEntry) {
+      // This is a programming error.
+      throw new IllegalStateException(String.format(
+        "Config entry was modified while in the plugin cache: '%s', class %s",
+        name, oldConfig.getClass().getName()));
+    }
     return oldEntry;
   }
 
@@ -261,4 +227,8 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
     configMap.clear();
     nameMap.clear();
   }
+
+  public Set<String> names() {
+    return nameMap.keySet();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 7ed1663..f1a75ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -17,71 +17,214 @@
  */
 package org.apache.drill.exec.store;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
   String PSTORE_NAME = "sys.storage_plugins";
 
+  @SuppressWarnings("serial")
+  public static class PluginException extends Exception {
+    public PluginException(String msg) {
+      super(msg);
+    }
+
+    public PluginException(String msg, Throwable e) {
+      super(msg, e);
+    }
+
+    public static PluginException systemPluginException(String operation, String name) {
+      return new PluginException(String.format(
+          "Cannot %s a system plugin: `%s`", operation, name));
+    }
+  }
+
+  /**
+   * Indicates the requested plugin was not found.
+   */
+  @SuppressWarnings("serial")
+  public static class PluginNotFoundException extends PluginException {
+    public PluginNotFoundException(String name) {
+      super("No storage plugin exists with name: `" + name + "`");
+    }
+  }
+
+  /**
+   * Indicates an error when decoding a plugin from JSON.
+   */
+  @SuppressWarnings("serial")
+  public static class PluginEncodingException extends PluginException {
+    public PluginEncodingException(String msg, Exception e) {
+      super(msg, e);
+    }
+  }
+
   /**
-   * Initialize the storage plugin registry. Must be called before the registry is used.
+   * Initialize the storage plugin registry. Must be called before the registry
+   * is used.
    */
   void init();
 
   /**
-   * Store a plugin by name and configuration. If the plugin already exists, update the plugin
+   * Store a plugin by name and configuration. If the plugin already exists,
+   * update the plugin. This form directly updates persistent storage. The
+   * in-memory cache is updated on the next refresh. This form will accept an
+   * invalid plugin, which will be disabled upon refresh. Since Drill is
+   * distributed, and plugins work with external systems, the external system
+   * can become invalid at any moment (not just when pugins are updated), so the
+   * model used for {@code put()} mimics normal runtime operation.
    *
    * @param name The name of the plugin
    * @param config The plugin configuration
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException if plugin cannot be created
+   * @throws PluginException if plugin cannot be created
+   */
+  void put(String name, StoragePluginConfig config) throws PluginException;
+
+  /**
+   * Like {@link #put(String, StoragePluginConfig)}, but forces instantiation of the
+   * plugin to verify that the configuration is valid at this moment in time.
+   */
+  void validatedPut(String name, StoragePluginConfig config) throws PluginException;
+
+  /**
+   * Set the plugin to the requested enabled state. Does nothing if the plugin
+   * is already in the requested state. If a formerly enabled plugin is
+   * disabled, moves the plugin from the in-memory cache to the ephemeral
+   * store. If a formerly disabled plugin is enabled, verifies that the plugin
+   * can be instantiated as for {@link #verifiedPut()}.
+   * <p>
+   * Use this method when changing state. Do not obtain the config and change
+   * the state directly, doing so will make the plugin config inconsistent
+   * with the internal state.
+   *
+   * @param name name of the plugin
+   * @param enabled {@code true} to enable the plugin, {@code false} to disable
+   * @throws PluginNotFoundException if the plugin is not found
+   * @throws PluginException if the plugin name is not valid or
+   * if enabling a plugin and the plugin is not valid
    */
-  void put(String name, StoragePluginConfig config) throws ExecutionSetupException;
+  void setEnabled(String name, boolean enabled) throws PluginException;
 
   /**
    * Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist.
    *
    * @param name The name of the plugin
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException if plugin cannot be obtained
+   * @throws PluginException if plugin cannot be obtained
    */
-  StoragePlugin getPlugin(String name) throws ExecutionSetupException;
+  StoragePlugin getPlugin(String name) throws PluginException, UserException;
 
   /**
    * Get a plugin by configuration. If it doesn't exist, create it.
    *
    * @param config The configuration for the plugin.
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException if plugin cannot be obtained
+   * @throws PluginException if plugin cannot be obtained
+   */
+  StoragePlugin getPluginByConfig(StoragePluginConfig config) throws PluginException;
+
+  /**
+   * @deprecated use {@link #resolve(StoragePluginConfig, Class)} which provides
+   * type safety. Retained for compatibility with older plugins
    */
+  @Deprecated
   StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException;
 
   /**
-   * Retrieve a stored configuration by name. Returns only defined
-   * plugins (not system plugins). Returns both enabled and disabled
-   * plugins. Use this to obtain a plugin for editing (rather than
+   * Return a plugin from persistent storage. Returns both enabled and
+   * disabled stored plugins, but does not return system plugins.
+   * Use this to obtain a plugin for editing (rather than
    * for planning or executing a query.)
    */
-  StoragePluginConfig getConfig(String name);
+  StoragePluginConfig getStoredConfig(String name);
+
+  /**
+   * Return a config encoded as JSON.
+   * @throws PluginException if the plugin is undefined
+   */
+  String encode(String name) throws PluginException;
+  String encode(StoragePluginConfig config);
+
+  /**
+   * Return a config decoded from JSON.
+   * @throws PluginEncodingException if the JSON is invalid
+   */
+  StoragePluginConfig decode(String json) throws PluginEncodingException;
+
+  /**
+   * Put a storage plugin config from JSON. Validates the JSON and the
+   * resulting storage plugin. Use this form for JSON received from
+   * the UI or other external source.
+   *
+   * @throws IOException if the JSON is invalid
+   * @throws PluginException if the underlying
+   * {@link #validatedPut(String, StoragePluginConfig)} fails
+   */
+  void putJson(String name, String json) throws PluginException;
+
+  /**
+   * Copy a stored config so that it can be modified.
+   * <p>
+   * <i><b>Never modify a config stored in the registry!</b></i>
+   * Configs are keyed by name and value; getting a config, then
+   * modifying it, will cause the value maps to become out of sync.
+   *
+   * @param name name of the storage plugin config to copy
+   * @return a copy of the config
+   * @throws PluginException if the name is undefined
+   */
+  StoragePluginConfig copyConfig(String name) throws PluginException;
+
+  /**
+   * Copy the given storage plugin config so it may be modified.
+   *
+   * @param config the storage plugin config to copy
+   * @return the copy
+   */
+  StoragePluginConfig copyConfig(StoragePluginConfig config);
+
+  /**
+   * Retrieve an available configuration. Returns enabled stored plugins
+   * and system plugins. These configs are those that can be used to
+   * plan a query.
+   */
+  StoragePluginConfig getDefinedConfig(String name);
 
   /**
    * Remove a plugin by name
    *
    * @param name The name of the storage plugin to remove
+   * @throws PluginException
    */
-  void remove(String name);
+  void remove(String name) throws PluginException;
 
   /**
-   * Returns a copy of the set of all stored plugin configurations,
-   * directly from the persistent store.
+   * Returns a set of all stored plugin configurations,
+   * directly from the persistent store. Note: the actual
+   * configs may reside in the cache; make a copy before
+   * making any changes.
    * @return map of stored plugin configurations
    */
   Map<String, StoragePluginConfig> storedConfigs();
 
+  enum PluginFilter { ALL, ENABLED, DISABLED };
+
+  /**
+   * Return a possibly-filtered set of plugins from the persistent
+   * store.
+   */
+  Map<String, StoragePluginConfig> storedConfigs(PluginFilter filter);
+
   /**
    * Returns a copy of the set of enabled stored plugin configurations.
    * The registry is refreshed against the persistent store prior
@@ -91,14 +234,50 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
   Map<String, StoragePluginConfig> enabledConfigs();
 
   /**
-   * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config.
+   * Returns the set of available plugin names.
+   * Includes system plugins and enabled stored plugins.
+   */
+  Set<String> availablePlugins();
+
+  /**
+   * Safe way to add or remove a format plugin config from a stored file
+   * system configuration. Makes a copy of the config, adds/removes the
+   * format plugin, and updates the persistent store with the copy.
    *
-   * @param storageConfig The storage config for the associated FileSystemPlugin
-   * @param formatConfig The format config for the associated FormatPlugin
+   * @param pluginName name of the file system storage plugin config to
+   * modify
+   * @param formatName name of the format plugin to modify
+   * @param formatConfig if null, removes the plugin, if non-null updates
+   * the format plugin config with this value
+   * @throws PluginException if the storage plugin is undefined or
+   * is not a file format plugin
+   */
+  void putFormatPlugin(String pluginName, String formatName,
+      FormatPluginConfig formatConfig) throws PluginException;
+
+  /**
+   * Get the Format plugin for the FileSystemPlugin associated with the provided
+   * storage config and format config.
+   *
+   * @param storageConfig
+   *          The storage config for the associated FileSystemPlugin
+   * @param formatConfig
+   *          The format config for the associated FormatPlugin
    * @return A FormatPlugin instance
-   * @throws ExecutionSetupException if plugin cannot be obtained
+   * @throws PluginException
+   *           if plugin cannot be obtained
    */
-  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException;
+  FormatPlugin getFormatPluginByConfig(StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig) throws PluginException;
+
+  /**
+   * @deprecated use
+   * {@link #resolveFormat(StoragePluginConfig, FormatPluginConfig, Class)}
+   * which provides type safety. Retained for compatibility with older plugins
+   */
+  @Deprecated
+  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig) throws ExecutionSetupException;
 
   /**
    * Get the Schema factory associated with this storage plugin registry.
@@ -106,4 +285,29 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
    * @return A SchemaFactory that can register the schemas associated with this plugin registry.
    */
   SchemaFactory getSchemaFactory();
+
+  /**
+   * Object mapper to read/write the JSON form of a plugin.
+   * config.
+   */
+  ObjectMapper mapper();
+
+  <T extends StoragePlugin> T resolve(
+      StoragePluginConfig storageConfig, Class<T> desired);
+
+  /**
+   * Resolve a storage plugin given a storage plugin config. Call from
+   * within a file storage plugin to resolve the plugin.
+   *
+   * @param <T> the required type of the plugin
+   * @param storageConfig storage plugin config
+   * @param formatConfig format plugin config
+   * @param desired desired target class
+   * @return the storage plugin
+   * @throws IllegalStateException if the plugin is unknown or of the wrong
+   * format - errors which should never occur in normal operation
+   */
+  <T extends FormatPlugin> T resolveFormat(
+      StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig, Class<T> desired);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 8898b68..6ca2732 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -39,6 +39,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.PluginHandle.PluginType;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
@@ -47,6 +48,8 @@ import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 /**
  * Plugin registry. Caches plugin instances which correspond to configurations
  * stored in persistent storage. Synchronizes the instances and storage.
@@ -213,18 +216,34 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   @Override
   public void init() {
     locators.stream().forEach(loc -> loc.init());
-    loadIntrinsicPlugins();
+    try {
+      loadIntrinsicPlugins();
+    } catch (PluginException e) {
+      // Should only occur for a programming error
+      throw new IllegalStateException("Failed to load system plugins", e);
+    }
     defineConnectors();
     prepareStore();
   }
 
-  private void loadIntrinsicPlugins() {
+  private void loadIntrinsicPlugins() throws PluginException {
     for (ConnectorLocator locator : locators) {
       Collection<StoragePlugin> intrinsicPlugins = locator.intrinsicPlugins();
       if (intrinsicPlugins == null) {
         continue;
       }
       for (StoragePlugin sysPlugin : intrinsicPlugins) {
+        // Enforce lower case names. Since the name of a system plugin
+        // is "hard coded", we can't adjust it if it is not already
+        // lower case. All we can do is fail to tell the developer that
+        // something is wrong.
+        String origName = sysPlugin.getName();
+        String lcName = sysPlugin.getName().toLowerCase();
+        if (!origName.equals(lcName)) {
+          throw new IllegalStateException(String.format(
+              "Plugin names must be in lower case but system plugin name `%s` is not",
+              origName));
+        }
         ConnectorHandle connector = ConnectorHandle.intrinsicConnector(locator, sysPlugin);
         defineConnector(connector);
         pluginCache.put(new PluginHandle(sysPlugin, connector, PluginType.INTRINSIC));
@@ -298,7 +317,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       return;
     }
     for (Map.Entry<String, StoragePluginConfig> newPlugin : upgraded) {
-      StoragePluginConfig oldPluginConfig = pluginStore.get(newPlugin.getKey());
+      StoragePluginConfig oldPluginConfig = getStoredConfig(newPlugin.getKey());
       if (oldPluginConfig != null) {
         copyPluginStatus(oldPluginConfig, newPlugin.getValue());
       }
@@ -360,82 +379,176 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   }
 
   @Override
-  public void put(String name, StoragePluginConfig config) throws ExecutionSetupException {
+  public void put(String name, StoragePluginConfig config) throws PluginException {
+    name = validateName(name);
 
     // Do not allow overwriting system plugins
     // This same check is done later. However, we want to do this check
     // before writing to the persistent store, which we must do before
     // putting the plugin into the cache (where the second check is done.)
-    PluginHandle oldEntry = pluginCache.get(name);
-    if (oldEntry != null && oldEntry.isIntrinsic()) {
-      throw UserException.permissionError()
-        .message("Attempt to replace a system plugin.")
-        .addContext("Plugin name", name)
-        .addContext("Intrinsic plugin class", oldEntry.config().getClass().getName())
-        .addContext("Attempted replacement", config.getClass().getName())
-        .build(logger);
+    PluginHandle currentEntry = pluginCache.get(name);
+    if (currentEntry != null && currentEntry.isIntrinsic()) {
+      throw PluginException.systemPluginException(
+          "replace", name);
     }
 
-    // Write to the store first. We are now in a race to see which
-    // thread will update the cache: might be us or might the another
-    // thread.
+    // Write to the store. We don't bother to update the cache; we could
+    // only update our own cache, not those of other Drillbits. We rely
+    // on the cache refresh mechanism to kick in when the Drillbit asks
+    // for the plugin instance.
     pluginStore.put(name, config);
+  }
+
+  private String validateName(String name) throws PluginException {
+    if (name == null) {
+      throw new PluginException("Plugin name cannot be null");
+    }
+    name = name.trim().toLowerCase();
+    if (name.isEmpty()) {
+      throw new PluginException("Plugin name cannot be null");
+    }
+    return name;
+  }
 
-    // Will fail on an attempt to update a system plugin. Update
-    // will be rejected if another thread beats us to it.
+  @Override
+  public void validatedPut(String name, StoragePluginConfig config)
+      throws PluginException {
+
+    name = validateName(name);
+    PluginHandle oldEntry;
     if (config.isEnabled()) {
-      PluginHandle newHandle = restoreFromEphemeral(name, config);
-      oldEntry = pluginCache.put(newHandle);
+      PluginHandle entry = restoreFromEphemeral(name, config);
+      try {
+        entry.plugin();
+      } catch (Exception e) {
+        throw new PluginException(String.format(
+            "Invalid plugin config for '%s'", name), e);
+      }
+      oldEntry = pluginCache.put(entry);
     } else {
-      oldEntry = pluginCache.remove(name, config);
+      oldEntry = pluginCache.remove(name);
     }
-
-    // Let's optimistically assume that running queries may still use
-    // the old config, so transfer the possibly-created instance to the
-    // ephemeral store.
     moveToEphemeral(oldEntry);
+    pluginStore.put(name, config);
+  }
+
+  @Override
+  public void setEnabled(String name, boolean enable) throws PluginException {
+
+    // Works only with the stored config. (Some odd persistent stores do not
+    // actually serialize the config; they just cache a copy.) If we change
+    // anything, the next request will do a resync to pick up the change.
+    name = validateName(name);
+    StoragePluginConfig config = requireStoredConfig(name);
+    if (config.isEnabled() == enable) {
+      return;
+    }
+    StoragePluginConfig copy = copyConfig(config);
+    copy.setEnabled(enable);
+    validatedPut(name, copy);
   }
 
   /**
-   * If there is an ephemeral plugin of this (name, config), pair,
-   * transfer that plugin out of ephemeral storage for reuse. Else
-   * create a new handle.
-   *
-   * @param name plugin name
-   * @param config plugin config
-   * @return a handle for the plugin which may have been retrieved from
-   * ephemeral storage
+   * Configs are obtained from the persistent store. This method is
+   * called only by the UI to edit a stored plugin; so no benefit to
+   * using the cache. We also want a plugin even if it is disabled,
+   * and disabled plugins do not reside in the cache.
+   * <p>
+   * Note that each call (depending on the store implementation)
+   * may return a distinct instance of the config. The instance will
+   * be equal (unless the stored version changes.) However, other
+   * versions of the store may return the same instance as is in
+   * the cache. So, <b>do not</b> modify the returned config.
+   * To modify the config, call {@link #copyConfig(String)} instead.
    */
-  private PluginHandle restoreFromEphemeral(String name,
-      StoragePluginConfig config) {
+  @Override
+  public StoragePluginConfig getStoredConfig(String name) {
+    return pluginStore.get(name);
+  }
 
-    // Benign race condition between check and invalidate.
-    PluginHandle ephemeralEntry = ephemeralPlugins.getIfPresent(config);
-    if (ephemeralEntry == null || !name.equalsIgnoreCase(ephemeralEntry.name())) {
-      return createPluginEntry(name, config, PluginType.STORED);
-    } else {
+  @Override
+  public StoragePluginConfig copyConfig(String name) throws PluginException {
+    return copyConfig(requireStoredConfig(name));
+  }
 
-      // Transfer the instance to a new handle, then invalidate the
-      // cache entry. The transfer ensures that the invalidate will
-      // not close the plugin instance
-      PluginHandle newHandle = ephemeralEntry.transfer(PluginType.STORED);
-      ephemeralPlugins.invalidate(config);
-      return newHandle;
+  private StoragePluginConfig requireStoredConfig(String name) throws PluginException {
+    StoragePluginConfig config = getStoredConfig(name);
+    if (config == null) {
+      throw new PluginNotFoundException(name);
     }
+    return config;
   }
 
   @Override
-  public StoragePluginConfig getConfig(String name) {
-    PluginHandle plugin = pluginCache.get(name);
-    if (plugin != null) {
-      return plugin.isIntrinsic() ? null : plugin.config();
+  public String encode(StoragePluginConfig config) {
+    ObjectMapper mapper = context.mapper();
+    try {
+      return mapper.writer()
+          .forType(config.getClass())
+          .writeValueAsString(config);
+    } catch (IOException e) {
+      // We control serialization, so no errors should occur.
+      throw new IllegalStateException("Serialize failed", e);
     }
-    return pluginStore.get(name);
+  }
+
+  @Override
+  public String encode(String name) throws PluginException {
+    return encode(requireStoredConfig(validateName(name)));
+  }
+
+  @Override
+  public StoragePluginConfig decode(String json) throws PluginEncodingException {
+
+    // We don't control the format of the input JSON, so an
+    // error could occur.
+    try {
+      return context.mapper().reader()
+          .forType(StoragePluginConfig.class)
+          .readValue(json);
+    } catch (IOException e) {
+      throw new PluginEncodingException("Failure when decoding plugin JSON", e);
+    }
+  }
+
+  @Override
+  public void putJson(String name, String json) throws PluginException {
+    validatedPut(name, decode(json));
+  }
+
+  @Override
+  public StoragePluginConfig copyConfig(StoragePluginConfig orig) {
+    try {
+
+      // TODO: Storage plugin configs don't define a "clone" or "copy"
+      // method, so use a round-trip to JSON to accomplish the same task.
+      return decode(encode(orig));
+    } catch (PluginEncodingException e) {
+      throw new IllegalStateException("De/serialize failed", e);
+    }
+  }
+
+  @Override
+  public StoragePluginConfig getDefinedConfig(String name) {
+    try {
+      name = validateName(name);
+    } catch (PluginException e) {
+      // Name is not valid, so no plugin matches the name.
+      return null;
+    }
+    PluginHandle entry = getEntry(name);
+    return entry == null ? null : entry.config();
   }
 
   // Gets a plugin with the named configuration
   @Override
-  public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
+  public StoragePlugin getPlugin(String name) throws PluginException {
+    try {
+      name = validateName(name);
+    } catch (PluginException e) {
+      // Name is not valid, so no plugin matches the name.
+      return null;
+    }
     PluginHandle entry = getEntry(name);
 
     // Lazy instantiation: the first call to plugin() creates the
@@ -448,7 +561,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     if (plugin != null && plugin.isIntrinsic()) {
       return plugin;
     }
-    StoragePluginConfig config = pluginStore.get(name);
+    StoragePluginConfig config = getStoredConfig(name);
     if (plugin == null) {
       return refresh(name, config);
     } else {
@@ -477,10 +590,16 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     if (config == null || !config.isEnabled()) {
 
       // Move the old config to the ephemeral store.
-      if (pluginCache.remove(entry)) {
-        moveToEphemeral(entry);
+      try {
+        if (pluginCache.remove(entry.name()) == entry) {
+          moveToEphemeral(entry);
+        }
+        return null;
+      } catch (PluginException e) {
+        // Should never occur, only if the persistent store where to
+        // somehow contain an entry with the same name as a system plugin.
+        throw new IllegalStateException("Plugin refresh failed", e);
       }
-      return null;
     }
     // Unchanged?
     if (entry.config().equals(config)) {
@@ -489,11 +608,17 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
     // Plugin changed. Handle race condition on replacement.
     PluginHandle newEntry = restoreFromEphemeral(entry.name(), config);
-    if (pluginCache.replace(entry, newEntry)) {
-      moveToEphemeral(entry);
-      return newEntry;
-    } else {
-      return pluginCache.get(entry.name());
+    try {
+      if (pluginCache.replace(entry, newEntry)) {
+        moveToEphemeral(entry);
+        return newEntry;
+      } else {
+        return pluginCache.get(entry.name());
+      }
+    } catch (PluginException e) {
+      // Should never occur, only if the persistent store where to
+      // somehow contain an entry with the same name as a system plugin.
+      throw new IllegalStateException("Plugin refresh failed", e);
     }
   }
 
@@ -509,6 +634,23 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
   @Override
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+    try {
+      return getPluginByConfig(config);
+    } catch (PluginException e) {
+      throw translateException(e);
+    }
+  }
+
+  private ExecutionSetupException translateException(PluginException e) {
+    Throwable cause = e.getCause();
+    if (cause != null && cause instanceof ExecutionSetupException) {
+      return (ExecutionSetupException) cause;
+    }
+    return new ExecutionSetupException(e);
+  }
+
+  @Override
+  public StoragePlugin getPluginByConfig(StoragePluginConfig config) throws PluginException {
     // Try to lookup plugin by configuration
     PluginHandle plugin = pluginCache.get(config);
     if (plugin != null) {
@@ -521,11 +663,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       return ephemeralPlugins.get(config).plugin();
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
-      if (cause instanceof ExecutionSetupException) {
-        throw (ExecutionSetupException) cause;
+      if (cause instanceof PluginException) {
+        throw (PluginException) cause;
       } else {
         // this shouldn't happen. here for completeness.
-        throw new ExecutionSetupException(
+        throw new PluginException(
             "Failure while trying to create ephemeral plugin.", cause);
       }
     }
@@ -536,22 +678,55 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   // the old one and added a new one of the same name.
   // TODO: Fix this
   @Override
-  public void remove(String name) {
-    PluginHandle entry  = pluginCache.remove(name);
-    if (entry != null) {
-      moveToEphemeral(entry);
-    }
+  public void remove(String name) throws PluginException {
+    name = validateName(name);
+
+    // Removing here allows us to check for system plugins
+    moveToEphemeral(pluginCache.remove(name));
 
     // Must tell store to delete even if not known locally because
     // the store might hold a disabled version
     pluginStore.delete(name);
   }
 
+  /**
+   * If there is an ephemeral plugin of this (name, config), pair,
+   * transfer that plugin out of ephemeral storage for reuse. Else
+   * create a new handle.
+   *
+   * @param name plugin name
+   * @param config plugin config
+   * @return a handle for the plugin which may have been retrieved from
+   * ephemeral storage
+   */
+  private PluginHandle restoreFromEphemeral(String name,
+      StoragePluginConfig config) {
+
+    // Benign race condition between check and invalidate.
+    PluginHandle ephemeralEntry = ephemeralPlugins.getIfPresent(config);
+    if (ephemeralEntry == null || !name.equalsIgnoreCase(ephemeralEntry.name())) {
+      return createPluginEntry(name, config, PluginType.STORED);
+    } else {
+
+      // Transfer the instance to a new handle, then invalidate the
+      // cache entry. The transfer ensures that the invalidate will
+      // not close the plugin instance
+      PluginHandle newHandle = ephemeralEntry.transfer(PluginType.STORED);
+      ephemeralPlugins.invalidate(config);
+      return newHandle;
+    }
+  }
+
   private void moveToEphemeral(PluginHandle handle) {
     if (handle == null) {
       return;
     }
 
+    // No need to move if no instance.
+    if (!handle.hasInstance()) {
+      return;
+    }
+
     // If already in the ephemeral store, don't replace.
     // Race condition is benign: two threads both doing the put
     // will cause the first handle to be closed when the second hits.
@@ -564,11 +739,29 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
   @Override
   public Map<String, StoragePluginConfig> storedConfigs() {
+    return storedConfigs(PluginFilter.ALL);
+  }
+
+  @Override
+  public Map<String, StoragePluginConfig> storedConfigs(PluginFilter filter) {
     Map<String, StoragePluginConfig> result = new HashMap<>();
     Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginStore.load();
     while (allPlugins.hasNext()) {
       Entry<String, StoragePluginConfig> plugin = allPlugins.next();
-      result.put(plugin.getKey(), plugin.getValue());
+      boolean include;
+      switch (filter) {
+      case ENABLED:
+        include = plugin.getValue().isEnabled();
+        break;
+      case DISABLED:
+        include = !plugin.getValue().isEnabled();
+        break;
+      default:
+        include = true;
+      }
+      if (include) {
+        result.put(plugin.getKey(), plugin.getValue());
+      }
     }
     return result;
   }
@@ -586,20 +779,60 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   }
 
   @Override
-  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
-    StoragePlugin storagePlugin = getPlugin(storageConfig);
+  public void putFormatPlugin(String pluginName, String formatName,
+      FormatPluginConfig formatConfig) throws PluginException {
+    pluginName = validateName(pluginName);
+    formatName = validateName(formatName);
+    StoragePluginConfig orig = requireStoredConfig(pluginName);
+    if (!(orig instanceof FileSystemConfig)) {
+      throw new PluginException(
+        "Format plugins can be added only to the file system plugin: " + pluginName);
+    }
+    FileSystemConfig copy = (FileSystemConfig) copyConfig(orig);
+    if (formatConfig == null) {
+      copy.getFormats().remove(formatName);
+    } else {
+      copy.getFormats().put(formatName, formatConfig);
+    }
+    put(pluginName, copy);
+  }
+
+  @Override
+  public FormatPlugin getFormatPluginByConfig(StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig) throws PluginException {
+    StoragePlugin storagePlugin = getPluginByConfig(storageConfig);
     return storagePlugin.getFormatPlugin(formatConfig);
   }
 
   @Override
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig) throws ExecutionSetupException {
+    try {
+      return getFormatPluginByConfig(storageConfig, formatConfig);
+    } catch (PluginException e) {
+      throw translateException(e);
+    }
+  }
+
+  @Override
   public SchemaFactory getSchemaFactory() {
     return schemaFactory;
   }
 
   // TODO: Remove this: it will force plugins to be instantiated
   // unnecessarily
+  // This is a bit of a hack. The planner calls this to get rules
+  // for queries. If even one plugin has issues, then all queries
+  // will fails, even those that don't use the invalid plugin.
+  //
+  // This hack may result in a delay (such as a timeout) again and
+  // again as each query tries to create the plugin. The solution is
+  // to disable the plugin, or fix the external system. This solution
+  // is more stable than, say, marking the plugin failed since we have
+  // no way to show or reset failed plugins.
   private static class PluginIterator implements Iterator<Entry<String, StoragePlugin>> {
     private final Iterator<PluginHandle> base;
+    private PluginHandle entry;
 
     public PluginIterator(Iterator<PluginHandle> base) {
       this.base = base;
@@ -607,18 +840,27 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
     @Override
     public boolean hasNext() {
-      return base.hasNext();
+      while (base.hasNext()) {
+        entry = base.next();
+        try {
+          entry.plugin();
+          return true;
+        } catch (Exception e) {
+          // Skip this one to avoid failing the query
+        }
+      }
+      return false;
     }
 
     @Override
     public Entry<String, StoragePlugin> next() {
-      PluginHandle entry = base.next();
       return new ImmutableEntry<>(entry.name(), entry.plugin());
     }
   }
 
   @Override
   public Iterator<Entry<String, StoragePlugin>> iterator() {
+    refresh();
     return new PluginIterator(pluginCache.iterator());
   }
 
@@ -672,8 +914,51 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       try {
         plugin.plugin().registerSchemas(schemaConfig, parent);
       } catch (Exception e) {
-        logger.warn("Error during `{}` schema initialization: {}", plugin.name(), e.getMessage(), e.getCause());
+        logger.warn("Error during `{}` schema initialization: {}",
+            plugin.name(), e.getMessage(), e.getCause());
       }
     }
   }
+
+  @Override
+  public ObjectMapper mapper() {
+    return context.mapper();
+  }
+
+  @Override
+  public <T extends StoragePlugin> T resolve(
+      StoragePluginConfig storageConfig, Class<T> desired) {
+    try {
+      return desired.cast(getPluginByConfig(storageConfig));
+    } catch (PluginException|ClassCastException e) {
+      // Should never occur
+      throw new IllegalStateException(String.format(
+          "Unable to load stroage plugin %s for provided config " +
+          "class %s", desired.getName(),
+          storageConfig.getClass().getName()), e);
+    }
+  }
+
+  @Override
+  public <T extends FormatPlugin> T resolveFormat(
+      StoragePluginConfig storageConfig,
+      FormatPluginConfig formatConfig, Class<T> desired) {
+    try {
+      return desired.cast(getFormatPluginByConfig(storageConfig, formatConfig));
+    } catch (PluginException|ClassCastException e) {
+      // Should never occur
+      throw new IllegalStateException(String.format(
+          "Unable to load format plugin %s for provided plugin " +
+          "config class %s and format config class %s",
+          desired.getName(),
+          storageConfig.getClass().getName(),
+          formatConfig.getClass().getName()), e);
+    }
+  }
+
+  @Override
+  public Set<String> availablePlugins() {
+    refresh();
+    return pluginCache.names();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 8f011ce..bc78d5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -31,7 +31,10 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap.Builder;
 
 @JsonTypeName(FileSystemConfig.NAME)
 public class FileSystemConfig extends StoragePluginConfig {
@@ -49,7 +52,13 @@ public class FileSystemConfig extends StoragePluginConfig {
                           @JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
                           @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
     this.connection = connection;
-    this.config = config;
+
+    // Force creation of an empty map so that configs compare equal
+    Builder<String, String> builder = ImmutableMap.builder();
+    if (config != null) {
+      builder.putAll(config);
+    }
+    this.config = builder.build();
     Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
     Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
     this.workspaces = caseInsensitiveWorkspaces;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index e7c4128..88ebe4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
  * references to the FileSystem configuration and path management.
  */
 public class FileSystemPlugin extends AbstractStoragePlugin {
-
   private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class);
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 3c032c3..84d161d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.dfs.easy;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -90,12 +89,10 @@ public class EasyGroupScan extends AbstractFileGroupScan {
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("selectionRoot") Path selectionRoot,
       @JsonProperty("schema") TupleMetadata schema
-      ) throws IOException, ExecutionSetupException {
+      ) throws IOException {
     super(ImpersonationUtil.resolveUserName(userName));
     this.selection = FileSelection.create(null, files, selectionRoot);
-    this.formatPlugin = Preconditions.checkNotNull((EasyFormatPlugin<?>)
-        engineRegistry.getFormatPlugin(storageConfig, formatConfig),
-        "Unable to load format plugin for provided format config.");
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
     this.columns = columns == null ? ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
     SimpleFileTableMetadataProviderBuilder builder =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 82e5cac..86a1292 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -33,7 +33,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.Path;
 
 @JsonTypeName("fs-sub-scan")
@@ -59,8 +58,7 @@ public class EasySubScan extends AbstractSubScan {
     @JsonProperty("schema") TupleMetadata schema
     ) throws ExecutionSetupException {
     super(userName);
-    this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
-    Preconditions.checkNotNull(this.formatPlugin);
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
     this.files = files;
     this.columns = columns;
     this.selectionRoot = selectionRoot;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 379e2c9..2dec8c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -17,10 +17,8 @@
  */
 package org.apache.drill.exec.store.dfs.easy;
 
-import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -33,11 +31,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @JsonTypeName("fs-writer")
 public class EasyWriter extends AbstractWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
+  static final Logger logger = LoggerFactory.getLogger(EasyWriter.class);
 
   private final String location;
   private final List<String> partitionColumns;
@@ -51,11 +50,10 @@ public class EasyWriter extends AbstractWriter {
       @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
       @JsonProperty("storage") StoragePluginConfig storageConfig,
       @JsonProperty("format") FormatPluginConfig formatConfig,
-      @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+      @JacksonInject StoragePluginRegistry engineRegistry) {
 
     super(child);
-    this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
-    Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
     this.location = location;
     this.partitionColumns = partitionColumns;
     setStorageStrategy(storageStrategy);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
index 1e4d586..e9f84ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
@@ -20,35 +20,42 @@ package org.apache.drill.exec.store.easy.sequencefile;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import java.util.List;
+import java.util.Objects;
 
 @JsonTypeName("sequencefile") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class SequenceFileFormatConfig implements FormatPluginConfig {
 
   public List<String> extensions = ImmutableList.of();
 
-  @Override
-  public int hashCode() {
-    return (extensions == null)? 0 : extensions.hashCode();
-  }
-
   public List<String> getExtensions() {
     return extensions;
   }
 
   @Override
+  public int hashCode() {
+    return Objects.hash(extensions);
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;
-    } else if (obj == null) {
+    }
+    if (obj == null || getClass() != obj.getClass()) {
       return false;
-    } else if (getClass() == obj.getClass()) {
-      SequenceFileFormatConfig other = (SequenceFileFormatConfig) obj;
-      return (extensions == null)? (other.extensions == null) : extensions.equals(other.extensions);
     }
-    return false;
+    SequenceFileFormatConfig other = (SequenceFileFormatConfig) obj;
+    return Objects.equals(extensions, other.extensions);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("extensions", extensions)
+        .toString();
   }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 63eeb7a..7bcb0a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -61,7 +61,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig> {
-
   private static final Logger logger = LoggerFactory.getLogger(HttpdLogFormatPlugin.class);
 
   private static final String PLUGIN_EXTENSION = "httpd";
@@ -90,8 +89,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
   }
 
   /**
-   * This class performs the work for the plugin. This is where all logic goes to read records. In this case httpd logs
-   * are lines terminated with a new line character.
+   * Reads httpd logs lines terminated with a newline character.
    */
   private class HttpdLogRecordReader extends AbstractRecordReader {
 
@@ -111,10 +109,11 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
     }
 
     /**
-     * The query fields passed in are formatted in a way that Drill requires. Those must be cleaned up to work with the
-     * parser.
+     * The query fields passed in are formatted in a way that Drill requires.
+     * Those must be cleaned up to work with the parser.
      *
-     * @return Map with Drill field names as a key and Parser Field names as a value
+     * @return Map with Drill field names as a key and Parser Field names as a
+     *         value
      */
     private Map<String, String> makeParserFields() {
       Map<String, String> fieldMapping = new HashMap<>();
@@ -219,8 +218,9 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
   }
 
   /**
-   * This plugin supports pushing down into the parser. Only fields specifically asked for within the configuration will
-   * be parsed. If no fields are asked for then all possible fields will be returned.
+   * This plugin supports pushing project down into the parser. Only fields
+   * specifically asked for within the configuration will be parsed. If no
+   * fields are asked for then all possible fields will be returned.
    *
    * @return true
    */
@@ -230,7 +230,8 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
   }
 
   @Override
-  public RecordReader getRecordReader(final FragmentContext context, final DrillFileSystem dfs, final FileWork fileWork, final List<SchemaPath> columns, final String userName) {
+  public RecordReader getRecordReader(final FragmentContext context, final DrillFileSystem dfs,
+      final FileWork fileWork, final List<SchemaPath> columns, final String userName) {
     return new HttpdLogRecordReader(context, dfs, fileWork, columns);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
index b9e310c..aa12580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
@@ -19,7 +19,9 @@
 package org.apache.drill.exec.store.image;
 
 import java.util.List;
+import java.util.Objects;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
@@ -53,45 +55,31 @@ public class ImageFormatConfig implements FormatPluginConfig {
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((extensions == null) ? 0 : extensions.hashCode());
-    result = prime * result + (fileSystemMetadata ? 1231 : 1237);
-    result = prime * result + (descriptive ? 1231 : 1237);
-    result = prime * result + ((timeZone == null) ? 0 : timeZone.hashCode());
-    return result;
+    return Objects.hash(extensions, fileSystemMetadata, descriptive, timeZone);
   }
 
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;
-    } else if (obj == null) {
-      return false;
-    } else if (getClass() != obj.getClass()) {
-      return false;
-    }
-    ImageFormatConfig other = (ImageFormatConfig) obj;
-    if (extensions == null) {
-      if (other.extensions != null) {
-        return false;
-      }
-    } else if (!extensions.equals(other.extensions)) {
-      return false;
-    }
-    if (fileSystemMetadata != other.fileSystemMetadata) {
-      return false;
     }
-    if (descriptive != other.descriptive) {
+    if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    if (timeZone == null) {
-      if (other.timeZone != null) {
-        return false;
-      }
-    } else if (!timeZone.equals(other.timeZone)) {
-      return false;
-    }
-    return true;
+    ImageFormatConfig other = (ImageFormatConfig) obj;
+    return Objects.equals(extensions, other.extensions) &&
+           Objects.equals(fileSystemMetadata, other.fileSystemMetadata) &&
+           Objects.equals(descriptive, other.descriptive) &&
+           Objects.equals(timeZone, other.timeZone);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("extensions", extensions)
+        .field("fileSystemMetadata", fileSystemMetadata)
+        .field("descriptive", descriptive)
+        .field("timeZone", timeZone)
+        .toString();
   }
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 2ff8415..c18d7bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -19,6 +19,10 @@ package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -27,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class ParquetFormatConfig implements FormatPluginConfig {
 
   public boolean autoCorrectCorruptDates = true;
-  public boolean enableStringsSignedMinMax = false;
+  public boolean enableStringsSignedMinMax;
 
   /**
    * @return true if auto correction of corrupt dates is enabled, false otherwise
@@ -62,26 +66,20 @@ public class ParquetFormatConfig implements FormatPluginConfig {
     }
 
     ParquetFormatConfig that = (ParquetFormatConfig) o;
-
-    if (autoCorrectCorruptDates != that.autoCorrectCorruptDates) {
-      return false;
-    }
-
-    return enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+    return Objects.equals(autoCorrectCorruptDates, that.autoCorrectCorruptDates) &&
+           Objects.equals(enableStringsSignedMinMax, that.enableStringsSignedMinMax);
   }
 
   @Override
   public int hashCode() {
-    int result = (autoCorrectCorruptDates ? 1231 : 1237);
-    result = 31 * result + (enableStringsSignedMinMax ? 1231 : 1237);
-    return result;
+    return Objects.hash(autoCorrectCorruptDates, enableStringsSignedMinMax);
   }
 
   @Override
   public String toString() {
-    return "ParquetFormatConfig{"
-      + "autoCorrectCorruptDates=" + autoCorrectCorruptDates
-      + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
-      + '}';
+    return new PlanStringBuilder(this)
+        .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
+        .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
+        .toString();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 5c0e2e4..7d3cecf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -88,8 +88,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
     Preconditions.checkNotNull(formatConfig);
 
     this.cacheFileRoot = cacheFileRoot;
-    this.formatPlugin =
-        Preconditions.checkNotNull((ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig));
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, ParquetFormatPlugin.class);
     this.formatConfig = this.formatPlugin.getConfig();
     DrillFileSystem fs =
         ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 0bdc266..75cb302 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -59,7 +59,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              @JsonProperty("filter") LogicalExpression filter,
                              @JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
     this(userName,
-        (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
+        registry.resolveFormat(storageConfig, formatConfig, ParquetFormatPlugin.class),
         rowGroupReadEntries,
         columns,
         readerConfig,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 2bad852..61bd44f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -26,6 +26,8 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.StorageStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -34,11 +36,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 @JsonTypeName("parquet-writer")
 public class ParquetWriter extends AbstractWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriter.class);
+  static final Logger logger = LoggerFactory.getLogger(ParquetWriter.class);
 
 /** Version of Drill's Parquet writer. Increment this version (by 1) any time we make any format change to the file.
  * Format changes include:
@@ -66,8 +67,7 @@ public class ParquetWriter extends AbstractWriter {
           @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
 
     super(child);
-    this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, new ParquetFormatConfig());
-    Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+    this.formatPlugin = engineRegistry.resolveFormat(storageConfig, new ParquetFormatConfig(), ParquetFormatPlugin.class);
     this.location = location;
     this.partitionColumns = partitionColumns;
     setStorageStrategy(storageStrategy);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
index 133b5d7..adf6379 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
@@ -19,31 +19,30 @@ package org.apache.drill.exec.store.pcap;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.base.Objects;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
-import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 @JsonTypeName(PcapFormatPlugin.PLUGIN_NAME)
 public class PcapFormatConfig implements FormatPluginConfig {
 
-  private static final List<String> DEFAULT_EXTS = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
-
-  public List<String> extensions;
+  public List<String> extensions = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
 
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-  public boolean sessionizeTCPStreams = false;
+  public boolean sessionizeTCPStreams;
 
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   public List<String> getExtensions() {
-    return extensions == null ? DEFAULT_EXTS : extensions;
+    return extensions;
   }
 
   @Override
   public int hashCode() {
-    return Arrays.hashCode(new Object[]{extensions, sessionizeTCPStreams});
+    return Objects.hash(extensions, sessionizeTCPStreams);
   }
 
   @Override
@@ -55,7 +54,15 @@ public class PcapFormatConfig implements FormatPluginConfig {
       return false;
     }
     PcapFormatConfig other = (PcapFormatConfig) obj;
-    return Objects.equal(extensions, other.extensions)
-      && Objects.equal(sessionizeTCPStreams, other.sessionizeTCPStreams);
+    return Objects.equals(extensions, other.extensions) &&
+           Objects.equals(sessionizeTCPStreams, other.sessionizeTCPStreams);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("extensions", extensions)
+        .field("sessionizeTCPStreams", sessionizeTCPStreams)
+        .toString();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
index 7ff875a..8ded7ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.pcapng;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import java.util.Collections;
@@ -49,4 +51,11 @@ public class PcapngFormatConfig implements FormatPluginConfig {
   public int hashCode() {
     return Objects.hash(extensions);
   }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("extensions", extensions)
+        .toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 50d65a2..b2e37fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -45,13 +45,14 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
 
   private final SystemTable table;
   private final SystemTablePlugin plugin;
-  private int maxRecordsToRead;
+  private final int maxRecordsToRead;
 
   @JsonCreator
   public SystemTableScan(@JsonProperty("table") SystemTable table,
                          @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
                          @JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException {
-    this(table, maxRecordsToRead, (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE));
+    this(table, maxRecordsToRead,
+        engineRegistry.resolve(SystemTablePluginConfig.INSTANCE, SystemTablePlugin.class));
   }
 
   public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index f8bd2c4..ebbd87d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -24,9 +24,9 @@ import java.util.Optional;
 
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 
@@ -64,8 +64,8 @@ public class StoragePluginTestUtils {
   public static void updateSchemaLocation(final String pluginName,
                                           final StoragePluginRegistry pluginRegistry,
                                           final File tmpDirPath,
-                                          String... schemas) throws ExecutionSetupException {
-    final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
+                                          String... schemas) throws PluginException {
+    final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(pluginName);
 
     Map<String, WorkspaceConfig> newWorkspaces = new HashMap<>();
     Optional.ofNullable(pluginConfig.getWorkspaces())
@@ -91,13 +91,13 @@ public class StoragePluginTestUtils {
     pluginRegistry.put(pluginName, newPluginConfig);
   }
 
-  public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+  public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry) throws PluginException {
     configureFormatPlugins(pluginRegistry, CP_PLUGIN_NAME);
     configureFormatPlugins(pluginRegistry, DFS_PLUGIN_NAME);
   }
 
-  public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry, String storagePlugin) throws ExecutionSetupException {
-    FileSystemConfig fileSystemConfig = (FileSystemConfig) pluginRegistry.getConfig(storagePlugin);
+  public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry, String storagePlugin) throws PluginException {
+    FileSystemConfig fileSystemConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(storagePlugin);
 
     Map<String, FormatPluginConfig> newFormats = new HashMap<>();
     Optional.ofNullable(fileSystemConfig.getFormats())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index ce879cb..b9b1919 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -24,6 +24,7 @@ import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -406,7 +407,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
   }
 
   @AfterClass
-  public static void removeMiniDfsBasedStorage() {
+  public static void removeMiniDfsBasedStorage() throws PluginException {
     getDrillbitContext().getStorage().remove(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index b6af23f..03f52c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.impersonation;
 
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.avro.AvroDataGenerator;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.categories.SecurityTest;
@@ -304,7 +305,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
   }
 
   @AfterClass
-  public static void removeMiniDfsBasedStorage() {
+  public static void removeMiniDfsBasedStorage() throws PluginException {
     getDrillbitContext().getStorage().remove(MINI_DFS_STORAGE_PLUGIN_NAME);
     stopMiniDfsCluster();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
index d41bdf7..f4d6981 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
@@ -77,6 +77,9 @@ public class BasePluginRegistryTest extends BaseTest {
     // for these tests.
     @Override
     public DrillbitContext drillbitContext() { return null; }
+
+    @Override
+    public ObjectMapper hoconMapper() { return mapper; }
   }
 
   public static class StoragePluginFixtureConfig extends StoragePluginConfig {
@@ -107,8 +110,8 @@ public class BasePluginRegistryTest extends BaseTest {
     public int hashCode() {
       return Objects.hash(mode);
     }
-
   }
+
   @PrivatePlugin
   public static class StoragePluginFixture extends AbstractStoragePlugin {
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index 8da2016..d3c42e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store;
 
+import static org.apache.drill.exec.util.StoragePluginTestUtils.CP_PLUGIN_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -26,19 +27,28 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.BasePluginRegistryTest.StoragePluginFixture;
+import org.apache.drill.exec.store.BasePluginRegistryTest.StoragePluginFixtureConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.BaseTest;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.junit.After;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 /**
@@ -47,142 +57,361 @@ import org.junit.Test;
  * a Drillbit per tests to ensure each test works from a clean,
  * known registry.
  * <p>
+ * Tests are somewhat large because each needs a running
+ * Drillbit.
  * This is several big tests because of the setup cost of
  * starting the Drillbits in the needed config.
  */
-public class TestPluginRegistry extends BasePluginRegistryTest {
+public class TestPluginRegistry extends BaseTest {
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  private static final String SYS_PLUGIN_NAME = "sys";
+  private static final String S3_PLUGIN_NAME = "s3";
+
+  // Mixed-case name used to verify that names are forced to lower case.
+  private static final String MY_PLUGIN_NAME = "myPlugin";
+
+  // Lower-case form after insertion into the registry.
+  private static final String MY_PLUGIN_KEY = MY_PLUGIN_NAME.toLowerCase();
 
   @After
   public void cleanup() throws Exception {
     FileUtils.cleanDirectory(dirTestWatcher.getStoreDir());
   }
 
+  private FileSystemConfig myConfig1() {
+    FileSystemConfig config = new FileSystemConfig("myConn",
+        new HashMap<>(), new HashMap<>(), new HashMap<>());
+    config.setEnabled(true);
+    return config;
+  }
+
+  private FileSystemConfig myConfig2() {
+    Map<String, String> props = new HashMap<>();
+    props.put("foo", "bar");
+    FileSystemConfig config = new FileSystemConfig("myConn",
+        props, new HashMap<>(), new HashMap<>());
+    config.setEnabled(true);
+    return config;
+  }
+
   @Test
-  public void testBasicLifecycle() throws Exception {
+  public void testLifecycle() throws Exception {
     ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
-    try (ClusterFixture cluster = builder.build();) {
+    try (ClusterFixture cluster = builder.build()) {
       StoragePluginRegistry registry = cluster.storageRegistry();
 
       // Bootstrap file loaded.
-      assertNotNull(registry.getPlugin(StoragePluginTestUtils.CP_PLUGIN_NAME)); // Normal
-      assertNotNull(registry.getPlugin("sys")); // System
-      assertNull(registry.getConfig("sys")); // Not editable
+      assertNotNull(registry.getPlugin(CP_PLUGIN_NAME)); // Normal
+      assertNotNull(registry.getPlugin(SYS_PLUGIN_NAME)); // System
+      assertNull(registry.getStoredConfig(SYS_PLUGIN_NAME)); // Not editable
 
       assertNull(registry.getPlugin("bogus"));
 
       // Enabled plugins
       Map<String, StoragePluginConfig> configMap = registry.enabledConfigs();
-      assertTrue(configMap.containsKey(StoragePluginTestUtils.CP_PLUGIN_NAME));
-      assertFalse(configMap.containsKey("s3")); // Disabled, but still appears
-      assertFalse(configMap.containsKey("sys"));
+      assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+      assertFalse(configMap.containsKey(S3_PLUGIN_NAME)); // Disabled, but still appears
+      assertFalse(configMap.containsKey(SYS_PLUGIN_NAME));
+
+      assertNotNull(registry.getDefinedConfig(CP_PLUGIN_NAME));
+      assertNull(registry.getDefinedConfig(S3_PLUGIN_NAME));
+      assertNotNull(registry.getDefinedConfig(SYS_PLUGIN_NAME));
 
       // All stored plugins, including disabled
       configMap = registry.storedConfigs();
-      assertTrue(configMap.containsKey(StoragePluginTestUtils.CP_PLUGIN_NAME));
-      assertTrue(configMap.containsKey("s3")); // Disabled, but still appears
-      assertNotNull(configMap.get("s3"));
-      assertSame(registry.getConfig("s3"), configMap.get("s3"));
-      assertFalse(configMap.containsKey("sys"));
+      assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+      assertTrue(configMap.containsKey(S3_PLUGIN_NAME)); // Disabled, but still appears
+      assertNotNull(configMap.get(S3_PLUGIN_NAME));
+      assertSame(registry.getStoredConfig(S3_PLUGIN_NAME), configMap.get(S3_PLUGIN_NAME));
+      assertFalse(configMap.containsKey(SYS_PLUGIN_NAME));
       int bootstrapCount = configMap.size();
 
+      // Enabled only
+      configMap = registry.storedConfigs(PluginFilter.ENABLED);
+      assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+      assertFalse(configMap.containsKey(S3_PLUGIN_NAME));
+
+      // Disabled only
+      configMap = registry.storedConfigs(PluginFilter.DISABLED);
+      assertFalse(configMap.containsKey(CP_PLUGIN_NAME));
+      assertTrue(configMap.containsKey(S3_PLUGIN_NAME));
+
       // Create a new plugin
-      FileSystemConfig pConfig1 = new FileSystemConfig("myConn",
-          new HashMap<>(), new HashMap<>(), new HashMap<>());
-      pConfig1.setEnabled(true);
-      registry.put("myPlugin", pConfig1);
-      StoragePlugin plugin1 = registry.getPlugin("myPlugin");
+      FileSystemConfig pConfig1 = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1);
+      StoragePlugin plugin1 = registry.getPlugin(MY_PLUGIN_NAME);
       assertNotNull(plugin1);
-      assertSame(plugin1, registry.getPlugin(pConfig1));
+      assertSame(plugin1, registry.getPluginByConfig(pConfig1));
       configMap = registry.storedConfigs();
 
       // Names converted to lowercase in persistent storage
-      assertTrue(configMap.containsKey("myplugin"));
+      assertTrue(configMap.containsKey(MY_PLUGIN_KEY));
       assertEquals(bootstrapCount + 1, configMap.size());
 
       // Names are case-insensitive
-      assertSame(plugin1, registry.getPlugin("myplugin"));
-      assertSame(plugin1, registry.getPlugin("MYPLUGIN"));
+      assertSame(plugin1, registry.getPlugin(MY_PLUGIN_KEY));
+      assertSame(plugin1, registry.getPlugin(MY_PLUGIN_NAME.toUpperCase()));
 
       // Update the plugin
-      Map<String, String> props = new HashMap<>();
-      props.put("foo", "bar");
-      FileSystemConfig pConfig2 = new FileSystemConfig("myConn",
-          props, new HashMap<>(), new HashMap<>());
+      FileSystemConfig pConfig2 = myConfig2();
+      registry.put(MY_PLUGIN_NAME, pConfig2);
+      StoragePlugin plugin2 = registry.getPlugin(MY_PLUGIN_NAME);
+      assertNotSame(plugin1, plugin2);
+      assertTrue(plugin2 instanceof FileSystemPlugin);
+      FileSystemPlugin fsStorage = (FileSystemPlugin) plugin2;
+      assertSame(pConfig2, fsStorage.getConfig());
+      assertSame(plugin2, registry.getPluginByConfig(pConfig2));
+
+      // Cannot create/update a plugin with null or blank name
+
+      FileSystemConfig pConfig3 = myConfig1();
+      try {
+        registry.put(null, pConfig3);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+      try {
+        registry.put("  ", pConfig3);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+    }
+  }
+
+  /**
+   * Tests the dedicated setEnabled() method to cleanly update the
+   * enabled status of a plugin by name.
+   */
+  @Test
+  public void testEnabledState() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
+
+      // Disable an enabled plugin
+      StoragePluginConfig config = registry.getStoredConfig(CP_PLUGIN_NAME);
+      assertTrue(config.isEnabled());
+
+      // Enable/disable has traditionally been done by changing
+      // the plugin outside of the registry, which leads to obvious
+      // race conditions.
+      // Tests synchronized version.
+      registry.setEnabled(CP_PLUGIN_NAME, true);
+      StoragePluginConfig savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+      assertEquals(config, savedConfig);
+      assertTrue(savedConfig.isEnabled());
+
+      registry.setEnabled(CP_PLUGIN_NAME, false);
+      savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+      assertEquals(config, savedConfig);
+      assertFalse(savedConfig.isEnabled());
+
+      // OK to disable twice
+      registry.setEnabled(CP_PLUGIN_NAME, false);
+      savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+      assertEquals(config, savedConfig);
+      assertFalse(savedConfig.isEnabled());
+
+      // Disabled plugins appear in the stored config map
+      Map<String, StoragePluginConfig> configMap = registry.storedConfigs();
+      assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+      assertEquals(config, configMap.get(CP_PLUGIN_NAME));
+
+      // Re-enable
+      registry.setEnabled(CP_PLUGIN_NAME, true);
+      savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+      assertEquals(config, savedConfig);
+      assertTrue(savedConfig.isEnabled());
+
+      // Error if plugin does not exist
+      try {
+        registry.setEnabled("foo", true);
+        fail();
+      } catch (PluginException e) {
+        // expected
+      }
+      try {
+        registry.setEnabled("foo", false);
+        fail();
+      } catch (PluginException e) {
+        // expected
+      }
+
+      // Error to mess with a system plugins
+      try {
+        registry.setEnabled(SYS_PLUGIN_NAME, true);
+        fail();
+      } catch (PluginException e) {
+        // expected
+      }
+      try {
+        registry.setEnabled(SYS_PLUGIN_NAME, false);
+        fail();
+      } catch (PluginException e) {
+        // expected
+      }
+    }
+  }
+
+  /**
+   * Tests the other way to enable/disabled plugins: make a **COPY** of the
+   * config and set the enable/disable status. Note: race conditions happen
+   * if a client modifies a stored config. Old code would do that, but the
+   * results are undefined. Either use setEnabled(), or make a copy of the
+   * config. This case also models where the user edits the config by hand
+   * in the Web Console to disable the plugin: the deserialize/serialize
+   * steps make the copy.
+   */
+  @Test
+  public void testEnableWithPut() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
+
+      FileSystemConfig pConfig1 = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1);
+      assertTrue(registry.getStoredConfig(MY_PLUGIN_NAME).isEnabled());
+
+      // Enable the same plugin using a different, but equal, config.
+      // The original config remains in place.
+      StoragePluginConfig pConfig2 = registry.copyConfig(MY_PLUGIN_NAME);
       pConfig2.setEnabled(true);
-      registry.put("myPlugin", pConfig2);
-      StoragePlugin plugin2 = registry.getPlugin("myPlugin");
+      assertEquals(pConfig1, pConfig2);
+      registry.put(MY_PLUGIN_NAME, pConfig2);
+      StoragePluginConfig savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+      assertEquals(pConfig1, savedConfig);
+      assertTrue(savedConfig.isEnabled());
+
+      // Force resolution of the plugin so there is something to cache
+      StoragePlugin plugin = registry.getPlugin(MY_PLUGIN_NAME);
+      assertNotNull(plugin);
+
+      // Disable an enabled plugin. The old plugin lives in ephemeral
+      // store, but is not visible by name. If requested, the
+      // registry obtains a new copy from persistent storage.
+      StoragePluginConfig pConfig3 = registry.copyConfig(MY_PLUGIN_NAME);
+      pConfig3.setEnabled(false);
+      registry.put(MY_PLUGIN_NAME, pConfig3);
+      savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+      assertEquals(pConfig1, savedConfig);
+      assertFalse(savedConfig.isEnabled());
+
+      // OK to disable twice
+      StoragePluginConfig pConfig4 = registry.copyConfig(MY_PLUGIN_NAME);
+      pConfig4.setEnabled(false);
+      registry.put(MY_PLUGIN_NAME, pConfig4);
+      savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+      assertEquals(pConfig1, savedConfig);
+      assertFalse(savedConfig.isEnabled());
+
+      // Disabled plugins appear in the stored config map
+      Map<String, StoragePluginConfig> configMap = registry.storedConfigs();
+      assertTrue(configMap.containsKey(MY_PLUGIN_KEY));
+      assertEquals(pConfig3, configMap.get(MY_PLUGIN_KEY));
+
+      // Re-enable, the original plugin instance reappears.
+      StoragePluginConfig pConfig5 = registry.copyConfig(MY_PLUGIN_NAME);
+      pConfig5.setEnabled(true);
+      registry.put(MY_PLUGIN_NAME, pConfig5);
+      assertSame(plugin, registry.getPlugin(MY_PLUGIN_NAME));
+      assertTrue(plugin.getConfig().isEnabled());
+    }
+  }
+
+  /**
+   * Test the ephemeral cache where plugin instances live after their
+   * configs are changed or disabled so that any running queries see
+   * the prior plugin config an instance.
+   * <p>
+   * Note that each call to update a plugin must work with a copy of
+   * a config. Results are undefined if a client changes a stored
+   * config.
+   */
+  @Test
+  public void testEphemeralLifecycle() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
+
+      // Create a plugin
+      FileSystemConfig pConfig1 = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1);
+      StoragePlugin plugin1 = registry.getPlugin(MY_PLUGIN_NAME);
+
+      // Update the plugin
+      FileSystemConfig pConfig2 = myConfig2();
+      registry.put(MY_PLUGIN_NAME, pConfig2);
+      StoragePlugin plugin2 = registry.getPlugin(MY_PLUGIN_NAME);
       assertNotSame(plugin1, plugin2);
       assertTrue(plugin2 instanceof FileSystemPlugin);
       FileSystemPlugin fsStorage = (FileSystemPlugin) plugin2;
       assertSame(pConfig2, fsStorage.getConfig());
-      assertSame(plugin2, registry.getPlugin(pConfig2));
+      assertSame(plugin2, registry.getPluginByConfig(pConfig2));
 
       // Suppose a query was planned with plugin1 and now starts
       // to execute. Plugin1 has been replaced with plugin2. However
       // the registry moved the old plugin to ephemeral storage where
       // it can still be found by configuration.
-      StoragePlugin ePlugin1 = registry.getPlugin(pConfig1);
+      FileSystemConfig pConfig1a = myConfig1();
+      StoragePlugin ePlugin1 = registry.getPluginByConfig(pConfig1a);
       assertSame(plugin1, ePlugin1);
       assertNotSame(plugin2, ePlugin1);
 
-      // Now, another thread does the same. It gets the same
-      // ephemeral plugin.
-      assertSame(plugin1, registry.getPlugin(pConfig1));
-
       // Change the stored plugin back to the first config.
-      registry.put("myPlugin", pConfig1);
+      FileSystemConfig pConfig1b = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1b);
 
       // Now, lets suppose thread 3 starts to execute. It sees the original plugin
-      assertSame(plugin1, registry.getPlugin("myPlugin"));
+      assertSame(plugin1, registry.getPlugin(MY_PLUGIN_NAME));
 
       // But, the ephemeral plugin lives on. Go back to the second
       // config.
-      registry.put("myPlugin", pConfig2);
-      assertSame(plugin2, registry.getPlugin("myPlugin"));
+      FileSystemConfig pConfig2b = myConfig2();
+      registry.put(MY_PLUGIN_NAME, pConfig2b);
+      assertSame(plugin2, registry.getPlugin(MY_PLUGIN_NAME));
 
       // Thread 4, using the first config from planning in thread 3,
       // still sees the first plugin.
-      assertSame(plugin1, registry.getPlugin(pConfig1));
+      assertSame(plugin1, registry.getPluginByConfig(pConfig1b));
 
       // Disable
-      pConfig2.setEnabled(false);
-      assertNull(registry.getPlugin("myPlugin"));
+      registry.setEnabled(MY_PLUGIN_NAME, false);
+      assertNull(registry.getPlugin(MY_PLUGIN_NAME));
 
       // Though disabled, a running query will create an ephemeral
       // plugin for the config.
-      assertSame(plugin2, registry.getPlugin(pConfig2));
-
-      // Disabling an ephemeral plugin neither makes sense
-      // nor will have any effect.
-      ePlugin1.getConfig().setEnabled(false);
-      assertSame(ePlugin1, registry.getPlugin(pConfig1));
-      assertTrue(registry.storedConfigs().containsKey("myplugin"));
-      assertFalse(registry.enabledConfigs().containsKey("myplugin"));
+      assertSame(plugin1, registry.getPluginByConfig(pConfig1b));
+      assertSame(plugin2, registry.getPluginByConfig(pConfig2b));
 
-      // Enable. The config is retrieved from the persistent store.
-      // We notice the config is in the ephemeral store and
+      // Enable. We notice the config is in the ephemeral store and
       // so we restore it.
-      pConfig2.setEnabled(true);
-      assertSame(plugin2, registry.getPlugin("myPlugin"));
-      assertSame(plugin2, registry.getPlugin(pConfig2));
-      assertTrue(registry.storedConfigs().containsKey("myplugin"));
-      assertTrue(registry.enabledConfigs().containsKey("myplugin"));
+      registry.setEnabled(MY_PLUGIN_NAME, true);
+      assertSame(plugin2, registry.getPlugin(MY_PLUGIN_NAME));
+      assertSame(plugin2, registry.getPluginByConfig(pConfig2b));
+      assertTrue(registry.storedConfigs().containsKey(MY_PLUGIN_KEY));
+      assertTrue(registry.enabledConfigs().containsKey(MY_PLUGIN_KEY));
 
       // Delete the plugin
-      registry.remove("myPlugin");
-      assertNull(registry.getPlugin("myPlugin"));
+      registry.remove(MY_PLUGIN_NAME);
+      assertNull(registry.getPlugin(MY_PLUGIN_NAME));
 
       // Again a running query will retrieve the plugin from ephemeral storage
-      assertSame(plugin1, registry.getPlugin(pConfig1));
-      assertSame(plugin2, registry.getPlugin(pConfig2));
+      assertSame(plugin1, registry.getPluginByConfig(pConfig1));
+      assertSame(plugin2, registry.getPluginByConfig(pConfig2));
 
       // Delete again, no-op
-      registry.remove("myPlugin");
+      registry.remove(MY_PLUGIN_NAME);
 
       // The retrieve-from-ephemeral does not kick in if we create
       // a new plugin with the same config but a different name.
-      pConfig1.setEnabled(true);
-      registry.put("alias", pConfig1);
+      FileSystemConfig pConfig1c = myConfig1();
+      pConfig1c.setEnabled(true);
+      registry.put("alias", pConfig1c);
       StoragePlugin plugin4 = registry.getPlugin("alias");
       assertNotNull(plugin4);
       assertNotSame(plugin1, plugin4);
@@ -192,39 +421,96 @@ public class TestPluginRegistry extends BasePluginRegistryTest {
       // be returned on subsequent queries.
       registry.remove("alias");
       assertNull(registry.getPlugin("alias"));
-      assertSame(plugin1, registry.getPlugin(pConfig1));
+      assertSame(plugin1, registry.getPluginByConfig(pConfig1));
 
       // Try to change a system plugin
-      StoragePlugin sysPlugin = registry.getPlugin("sys");
+      StoragePlugin sysPlugin = registry.getPlugin(SYS_PLUGIN_NAME);
       assertNotNull(sysPlugin);
-      FileSystemConfig pConfig3 = new FileSystemConfig("myConn",
-          props, new HashMap<>(), new HashMap<>());
-      pConfig3.setEnabled(true);
+      FileSystemConfig pConfig3 = myConfig2();
       try {
-        registry.put("sys", pConfig3);
+        registry.put(SYS_PLUGIN_NAME, pConfig3);
         fail();
-      } catch (UserException e) {
+      } catch (PluginException e) {
         // Expected
       }
       pConfig3.setEnabled(false);
       try {
-        registry.put("sys", pConfig3);
+        registry.put(SYS_PLUGIN_NAME, pConfig3);
         fail();
-      } catch (UserException e) {
+      } catch (PluginException e) {
         // Expected
       }
-      assertSame(sysPlugin, registry.getPlugin("sys"));
+      assertSame(sysPlugin, registry.getPlugin(SYS_PLUGIN_NAME));
 
       // Try to delete a system plugin
       try {
-        registry.remove("sys");
+        registry.remove(SYS_PLUGIN_NAME);
         fail();
-      } catch (UserException e) {
+      } catch (PluginException e) {
         // Expected
       }
+    }
+  }
+
+  @Test
+  public void testEphemeralWithoutInstance() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
 
-      // There is no protection for disabling a system plugin because
-      // there is no code that will allow that at present.
+      // Create a plugin
+      // Since we've created no plugin instance, the configs come from
+      // the persistent store, there is no guarantee we get the same
+      // instance on each retrieval.
+      FileSystemConfig pConfig1 = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1);
+      StoragePluginConfig savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+      assertEquals(pConfig1, savedConfig);
+      assertTrue(savedConfig.isEnabled());
+      assertEquals(pConfig1, registry.getDefinedConfig(MY_PLUGIN_NAME));
+
+      // Do not refer to the instance. As a result, no reason to
+      // cache the plugin in ephemeral cache.
+
+      // Change config
+      FileSystemConfig pConfig1b = myConfig2();
+      registry.put(MY_PLUGIN_NAME, pConfig1b);
+      assertEquals(pConfig1b, registry.getDefinedConfig(MY_PLUGIN_NAME));
+
+      // Put it back
+      FileSystemConfig pConfig1c = myConfig1();
+      registry.put(MY_PLUGIN_NAME, pConfig1c);
+      assertEquals(pConfig1c, registry.getDefinedConfig(MY_PLUGIN_NAME));
+      assertEquals(pConfig1c, registry.getPlugin(MY_PLUGIN_NAME).getConfig());
+
+      // Odd case. Some thread refers to the config while not in
+      // the store which forces an instance which later reappears.
+      FileSystemConfig pConfig2a = myConfig1();
+      registry.put("myplugin2", pConfig2a);
+
+      // Didn't instantiate. Change
+      FileSystemConfig pConfig2b = myConfig2();
+      registry.put("myplugin2", pConfig2b);
+
+      // Force a resync
+      assertEquals(pConfig2b, registry.getDefinedConfig("myplugin2"));
+
+      // Refer by original config. We didn't cache the original config
+      // because there was no plugin instance. Must make up a new plugin
+      // Which goes into the ephemeral cache.
+      FileSystemConfig pConfig2c = myConfig1();
+      StoragePlugin plugin2 = registry.getPluginByConfig(pConfig2c);
+      assertEquals(pConfig2c, plugin2.getConfig());
+
+      // Put the original config into the persistent store and local cache.
+      // Should not dredge up the ephemeral version to reuse since that
+      // version had an unknown name.
+      // It is unfortunate that we have to instances with the same config,
+      // but different names. But, since the name is immutable, and not
+      // known above, the two-instance situation is the least bad option.
+      FileSystemConfig pConfig2d = myConfig1();
+      registry.put("myplugin2", pConfig2d);
+      assertEquals(pConfig2d, registry.getPlugin("myplugin2").getConfig());
     }
   }
 
@@ -240,43 +526,191 @@ public class TestPluginRegistry extends BasePluginRegistryTest {
       .put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
       .put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH,
           dirTestWatcher.getStoreDir().getAbsolutePath());
-    try (ClusterFixture cluster = builder.build();) {
+    try (ClusterFixture cluster = builder.build()) {
       StoragePluginRegistry registry1 = cluster.storageRegistry("bit1");
       StoragePluginRegistry registry2 = cluster.storageRegistry("bit2");
 
       // Define a plugin in Drillbit 1
-      FileSystemConfig pConfig1 = new FileSystemConfig("myConn",
-          new HashMap<>(), new HashMap<>(), new HashMap<>());
-      pConfig1.setEnabled(true);
-      registry1.put("myPlugin", pConfig1);
-      StoragePlugin plugin1 = registry1.getPlugin("myPlugin");
+      FileSystemConfig pConfig1 = myConfig1();
+      registry1.put(MY_PLUGIN_NAME, pConfig1);
+      StoragePlugin plugin1 = registry1.getPlugin(MY_PLUGIN_NAME);
       assertNotNull(plugin1);
 
       // Should appear in Drillbit 2
-      StoragePlugin plugin2 = registry2.getPlugin("myPlugin");
+      assertTrue(registry2.storedConfigs().containsKey(MY_PLUGIN_KEY));
+      StoragePlugin plugin2 = registry2.getPlugin(MY_PLUGIN_NAME);
       assertNotNull(plugin2);
-      assertEquals(pConfig1, plugin1.getConfig());
+      assertEquals(pConfig1, plugin2.getConfig());
 
       // Change in Drillbit 1
-      Map<String, String> props = new HashMap<>();
-      props.put("foo", "bar");
-      FileSystemConfig pConfig3 = new FileSystemConfig("myConn",
-          props, new HashMap<>(), new HashMap<>());
-      pConfig3.setEnabled(true);
-      registry1.put("myPlugin", pConfig3);
-      plugin1 = registry1.getPlugin("myPlugin");
-      assertSame(pConfig3, plugin1.getConfig());
+      FileSystemConfig pConfig3 = myConfig2();
+      registry1.put(MY_PLUGIN_NAME, pConfig3);
+      plugin1 = registry1.getPlugin(MY_PLUGIN_NAME);
+      assertEquals(pConfig3, plugin1.getConfig());
 
       // Change should appear in Drillbit 2
-      plugin2 = registry2.getPlugin("myPlugin");
+      assertTrue(registry2.storedConfigs().containsValue(pConfig3));
+      plugin2 = registry2.getPlugin(MY_PLUGIN_NAME);
       assertNotNull(plugin2);
       assertEquals(pConfig3, plugin1.getConfig());
 
       // Delete in Drillbit 2
-      registry2.remove("myPlugin");
+      registry2.remove(MY_PLUGIN_NAME);
 
       // Should not be available in Drillbit 1
-      assertNull(registry1.getPlugin("myPlugin"));
+      assertFalse(registry1.storedConfigs().containsKey(MY_PLUGIN_KEY));
+      assertNull(registry1.getPlugin(MY_PLUGIN_NAME));
+    }
+  }
+
+  @Test
+  public void testFormatPlugin() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
+
+      StoragePluginConfig config = registry.getStoredConfig(CP_PLUGIN_NAME);
+      FileSystemConfig fsConfig = (FileSystemConfig) config;
+      assertFalse(fsConfig.getFormats().containsKey("bsv"));
+
+      // Add a new format
+      TextFormatConfig bsv = new TextFormatConfig();
+      bsv.fieldDelimiter = '!';
+      registry.putFormatPlugin(CP_PLUGIN_NAME, "bsv", bsv);
+
+      config = registry.getStoredConfig(CP_PLUGIN_NAME);
+      fsConfig = (FileSystemConfig) config;
+      assertTrue(fsConfig.getFormats().containsKey("bsv"));
+      assertSame(bsv, fsConfig.getFormats().get("bsv"));
+
+      // Remove the format
+      registry.putFormatPlugin(CP_PLUGIN_NAME, "bsv", null);
+      config = registry.getStoredConfig(CP_PLUGIN_NAME);
+      fsConfig = (FileSystemConfig) config;
+      assertFalse(fsConfig.getFormats().containsKey("bsv"));
+
+      // Undefined plugin
+      try {
+        registry.putFormatPlugin("bogus", "bsv", bsv);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+      // Try to set a non-FS plugin
+      try {
+        registry.putFormatPlugin(SYS_PLUGIN_NAME, "bsv", bsv);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+    }
+  }
+
+  /**
+   * Test to illustrate problems discussed in DRILL-7624
+   */
+  @Test
+  public void testBadPlugin() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    builder.configBuilder().put(ExecConstants.PRIVATE_CONNECTORS,
+        Collections.singletonList(StoragePluginFixture.class.getName()));
+    try (ClusterFixture cluster = builder.build()) {
+      StoragePluginRegistry registry = cluster.storageRegistry();
+
+      // Create a config that causes a crash because the plugin
+      // is not created on update.
+      StoragePluginFixtureConfig badConfig = new StoragePluginFixtureConfig("crash-ctor");
+      badConfig.setEnabled(true);
+
+      // Use the validated put to catch and reject errors at the cost of
+      // instantiating the plugin.
+      try {
+        registry.validatedPut("bad", badConfig);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+      assertNull(registry.getStoredConfig("bad"));
+      assertFalse(registry.availablePlugins().contains("bad"));
+
+      // Try the same with JSON
+      String json = registry.encode(badConfig);
+      try {
+        registry.putJson("bad", json);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+      assertFalse(registry.availablePlugins().contains("bad"));
+
+      // Now, lets pretend the plugin was valid when we did the above,
+      // but later the external system failed.
+      registry.put("bad", badConfig);
+      assertEquals(badConfig, registry.getStoredConfig("bad"));
+      assertTrue(registry.availablePlugins().contains("bad"));
+
+      // Ask for the actual plugin. Now will fail.
+      try {
+        registry.getPlugin("bad");
+        fail();
+      } catch (UserException e) {
+        assertTrue(e.getMessage().contains("bad"));
+      }
+      assertTrue(registry.availablePlugins().contains("bad"));
+
+      // No plugin created. Will fail the next time also.
+      try {
+        registry.getPlugin("bad");
+        fail();
+      } catch (UserException e) {
+        // Expected
+      }
+      assertTrue(registry.availablePlugins().contains("bad"));
+
+      // The iterator used to find planning rules will skip the failed
+      // plugin. (That the planner uses all rules is, itself, a bug.)
+      int n = registry.availablePlugins().size();
+      int count = 0;
+      for (@SuppressWarnings("unused") Entry<String, StoragePlugin> entry : registry) {
+        count++;
+      }
+      assertEquals(n - 1, count);
+
+      // Reset to known good state
+      registry.remove("bad");
+
+      // Get tricky. Create a good plugin, then replace with
+      // a disabled bad one.
+      StoragePluginFixtureConfig goodConfig = new StoragePluginFixtureConfig("ok");
+      goodConfig.setEnabled(true);
+      json = registry.encode(goodConfig);
+      registry.putJson("test", json);
+      assertTrue(registry.availablePlugins().contains("test"));
+      assertEquals(goodConfig, registry.getPlugin("test").getConfig());
+
+      // Replace with a disabled bad plugin
+      badConfig = new StoragePluginFixtureConfig("crash-ctor");
+      badConfig.setEnabled(false);
+      json = registry.encode(badConfig);
+      registry.putJson("test", json);
+      assertFalse(registry.availablePlugins().contains("test"));
+      assertNull(registry.getPlugin("test"));
+      assertNotNull(registry.getStoredConfig("test"));
+      assertEquals(badConfig, registry.getStoredConfig("test"));
+
+      // Attempt to disable a disabled plugin. Should be OK.
+      registry.setEnabled("test", false);
+
+      // But, can't use this as a back door to enable an
+      // invalid plugin. (If the problem is due to an external
+      // system, fix that system first.)
+      try {
+        registry.setEnabled("test", true);
+        fail();
+      } catch (PluginException e) {
+        // Expected
+      }
+      assertFalse(registry.availablePlugins().contains("test"));
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
index 5413b7c..3ac0eb5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.store.PluginHandle.PluginType;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.test.OperatorFixture;
 import org.junit.Test;
 
@@ -119,7 +120,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
   }
 
   @Test
-  public void testBasics() {
+  public void testBasics() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginFixtureConfig config1a = new StoragePluginFixtureConfig("ok1");
@@ -168,12 +169,11 @@ public class TestPluginsMap extends BasePluginRegistryTest {
     assertTrue(configs.contains(entry1.config()));
     assertTrue(configs.contains(entry2.config()));
 
-    // Convenience (but not optimistically locked) remove
-    map.remove(entry1);
+    map.remove(entry1.name());
     assertNull(map.get(entry1.name()));
     assertNull(map.get(entry1.config()));
 
-    map.remove(entry2);
+    map.remove(entry2.name());
 
     assertTrue(map.getNames().isEmpty());
     assertTrue(map.plugins().isEmpty());
@@ -183,7 +183,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
   }
 
   @Test
-  public void testRemoveByName() {
+  public void testRemoveByName() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginMap map = new StoragePluginMap();
@@ -208,7 +208,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
   }
 
   @Test
-  public void testSafePutRemove() {
+  public void testSafePutRemove() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginMap map = new StoragePluginMap();
@@ -232,10 +232,9 @@ public class TestPluginsMap extends BasePluginRegistryTest {
     assertSame(entry3, map.putIfAbsent(entry4));
 
     // Remove
-    assertFalse(map.remove(entry1)); // Already replaced
-    assertTrue(map.remove(entry2)); // currently in map
-    assertTrue(map.remove(entry3));
-    assertFalse(map.remove(entry4));
+    assertSame(entry2, map.remove(entry2.name())); // currently in map
+    assertSame(entry3, map.remove(entry3.name()));
+    assertNull(map.remove(entry4.name()));
 
     assertTrue(map.getNames().isEmpty());
     assertTrue(map.plugins().isEmpty());
@@ -245,7 +244,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
   }
 
   @Test
-  public void testReplace() {
+  public void testReplace() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginMap map = new StoragePluginMap();
@@ -274,7 +273,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
   }
 
   @Test
-  public void testSafeRemove() {
+  public void testSafeRemove() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginMap map = new StoragePluginMap();
@@ -349,9 +348,9 @@ public class TestPluginsMap extends BasePluginRegistryTest {
 
       // Remove by entry fails for the same reasons as above.
       try {
-        map.remove(sysEntry);
+        map.remove(sysEntry.name());
         fail();
-      } catch (IllegalArgumentException e) {
+      } catch (PluginException e) {
         // Expected
       }
       assertSame(sysEntry, map.get(sysPlugin.getName()));
@@ -375,7 +374,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
     }
   }
 
-  public void testClose() {
+  public void testClose() throws PluginException {
     ConnectorHandle connector = fixtureConnector();
 
     StoragePluginMap map = new StoragePluginMap();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
index 8b95fbf..d6eb06b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
@@ -17,15 +17,10 @@
  */
 package org.apache.drill.exec.store.httpd;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
@@ -48,25 +43,11 @@ public class TestHTTPDLogReader extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
-    defineHTTPDPlugin();
-  }
-
-  private static void defineHTTPDPlugin() throws ExecutionSetupException {
 
-    // Create an instance of the regex config.
-    // Note: we can"t use the ".log" extension; the Drill .gitignore
-    // file ignores such files, so they"ll never get committed. Instead,
-    // make up a fake suffix.
+    // Define a temporary format plugin for the "cp" storage plugin.
     HttpdLogFormatConfig sampleConfig = new HttpdLogFormatConfig();
     sampleConfig.setLogFormat("%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"");
-
-    // Define a temporary format plugin for the "cp" storage plugin.
-    Drillbit drillbit = cluster.drillbit();
-    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
-    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
-    pluginConfig.getFormats().put("sample", sampleConfig);
-    pluginRegistry.put("cp", pluginConfig);
+    cluster.defineFormat("cp", "sample", sampleConfig);
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
index 3ea374c..0e47cf0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -20,19 +20,18 @@ package org.apache.drill.exec.store.log;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
@@ -79,6 +78,7 @@ public class TestLogReader extends ClusterTest {
     // Note: we can't use the ".log" extension; the Drill .gitignore
     // file ignores such files, so they'll never get committed. Instead,
     // make up a fake suffix.
+    Map<String, FormatPluginConfig> formats = new HashMap<>();
     LogFormatConfig sampleConfig = new LogFormatConfig();
     sampleConfig.setExtension("log1");
     sampleConfig.setRegex(DATE_ONLY_PATTERN);
@@ -87,6 +87,7 @@ public class TestLogReader extends ClusterTest {
     sampleConfig.getSchema().add(new LogFormatField("year", "INT"));
     sampleConfig.getSchema().add(new LogFormatField("month", "INT"));
     sampleConfig.getSchema().add(new LogFormatField("day", "INT"));
+    formats.put("sample", sampleConfig);
 
     // Full Drill log parser definition.
     LogFormatConfig logConfig = new LogFormatConfig();
@@ -106,6 +107,7 @@ public class TestLogReader extends ClusterTest {
     logConfig.getSchema().add(new LogFormatField("level"));
     logConfig.getSchema().add(new LogFormatField("module"));
     logConfig.getSchema().add(new LogFormatField("message"));
+    formats.put("drill-log", logConfig);
 
     //Set up additional configs to check the time/date formats
     LogFormatConfig logDateConfig = new LogFormatConfig();
@@ -120,10 +122,12 @@ public class TestLogReader extends ClusterTest {
     logDateConfig.getSchema().add(new LogFormatField("message"));
 
     logDateConfig.setMaxErrors(3);
+    formats.put("date-log",logDateConfig);
 
     LogFormatConfig mysqlLogConfig = new LogFormatConfig();
     mysqlLogConfig.setExtension("sqllog");
     mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
+    formats.put("mysql-log", mysqlLogConfig);
 
     // Firewall log file that requires date parsing
     LogFormatConfig firewallConfig = new LogFormatConfig();
@@ -136,18 +140,10 @@ public class TestLogReader extends ClusterTest {
     firewallConfig.getSchema().add(new LogFormatField("pid", "INT"));
     firewallConfig.getSchema().add(new LogFormatField("message"));
     firewallConfig.getSchema().add(new LogFormatField("src_ip"));
+    formats.put("ssdlog", firewallConfig);
 
     // Define a temporary format plugin for the "cp" storage plugin.
-    Drillbit drillbit = cluster.drillbit();
-    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
-    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
-    pluginConfig.getFormats().put("sample", sampleConfig);
-    pluginConfig.getFormats().put("drill-log", logConfig);
-    pluginConfig.getFormats().put("date-log",logDateConfig);
-    pluginConfig.getFormats().put("mysql-log", mysqlLogConfig);
-    pluginConfig.getFormats().put("ssdlog", firewallConfig);
-    pluginRegistry.put("cp", pluginConfig);
+    cluster.defineFormats("cp", formats);
 
     // Config similar to the above, but with no type info. Types
     // will be provided via the provided schema mechanism. Column names
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 5441670..bc6eff9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -19,11 +19,6 @@
 package org.apache.drill.exec.store.pcap;
 
 import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
@@ -39,19 +34,7 @@ public class TestPcapEVFReader extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
-    definePlugin();
-  }
-
-  private static void definePlugin() throws ExecutionSetupException {
-    PcapFormatConfig sampleConfig = new PcapFormatConfig();
-
-    // Define a temporary plugin for the "cp" storage plugin.
-    Drillbit drillbit = cluster.drillbit();
-    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
-    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
-    pluginConfig.getFormats().put("sample", sampleConfig);
-    pluginRegistry.put("cp", pluginConfig);
+    cluster.defineFormat("cp", "sample", new PcapFormatConfig());
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index b1f8aed..80e2a26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -38,7 +38,6 @@ import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.drill.common.config.DrillProperties;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
@@ -53,6 +52,7 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
@@ -467,7 +467,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
         StoragePluginRegistryImpl registry = (StoragePluginRegistryImpl) drillbit.getContext().getStorage();
         registry.put(name, config);
       }
-    } catch (ExecutionSetupException e) {
+    } catch (PluginException e) {
       throw new IllegalStateException("Plugin definition failed", e);
     }
   }
@@ -491,7 +491,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     for (Drillbit bit : drillbits()) {
       try {
         defineWorkspace(bit, pluginName, schemaName, path, defaultFormat, format);
-      } catch (ExecutionSetupException e) {
+      } catch (PluginException e) {
         // This functionality is supposed to work in tests. Change
         // exception to unchecked to make test code simpler.
 
@@ -504,9 +504,9 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
 
   private void defineWorkspace(Drillbit drillbit, String pluginName,
       String schemaName, String path, String defaultFormat, FormatPluginConfig format)
-      throws ExecutionSetupException {
+      throws PluginException {
     final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
+    final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(pluginName);
     final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(path, true, defaultFormat, false);
 
     Map<String, WorkspaceConfig> newWorkspaces = new HashMap<>();
@@ -531,7 +531,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     for (Drillbit bit : drillbits()) {
       try {
         defineFormats(bit, pluginName, formats);
-      } catch (ExecutionSetupException e) {
+      } catch (PluginException e) {
         throw new IllegalStateException(e);
       }
     }
@@ -539,10 +539,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
 
   private void defineFormats(Drillbit drillbit,
                              String pluginName,
-                             Map<String, FormatPluginConfig> formats) throws ExecutionSetupException {
+                             Map<String, FormatPluginConfig> formats) throws PluginException {
     StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
-    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
-    pluginConfig = pluginConfig.copyWithFormats(formats);
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.copyConfig(pluginName);
+    pluginConfig.getFormats().putAll(formats);
     pluginRegistry.put(pluginName, pluginConfig);
   }
 
@@ -550,7 +550,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
                             String pluginName,
                             FileSystemConfig pluginConfig,
                             Map<String, WorkspaceConfig> newWorkspaces,
-                            Map<String, FormatPluginConfig> newFormats) throws ExecutionSetupException {
+                            Map<String, FormatPluginConfig> newFormats) throws PluginException {
     FileSystemConfig newPluginConfig = new FileSystemConfig(
       pluginConfig.getConnection(),
       pluginConfig.getConfig(),
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
index 9588ea9..f2d6ade 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.test;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.mock.MockBreakageStorage;
 import org.apache.drill.exec.store.mock.MockBreakageStorage.MockBreakageStorageEngineConfig;
 
@@ -44,7 +44,7 @@ public class ClusterMockStorageFixture extends ClusterFixture {
         config.setEnabled(true);
         pluginRegistry.put(name, config);
         plugin = (MockBreakageStorage) pluginRegistry.getPlugin(name);
-      } catch (ExecutionSetupException e) {
+      } catch (PluginException e) {
         throw new IllegalStateException(e);
       }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
index 029d259..9c83657 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
@@ -64,7 +64,7 @@ public class TestBuilder {
   private Boolean ordered;
   private boolean approximateEquality;
   private double tolerance;
-  private TestServices services;
+  private final TestServices services;
   // Used to pass the type information associated with particular column names rather than relying on the
   // ordering of the columns in the CSV file, or the default type inferences when reading JSON, this is used for the
   // case where results of the test query are adding type casts to the baseline queries, this saves a little bit of
@@ -408,10 +408,6 @@ public class TestBuilder {
     return this;
   }
 
-  private boolean singleExplicitBaselineRecord() {
-    return baselineRecords != null;
-  }
-
   /**
    * Provide a SQL query to validate against.
    * @param baselineQuery
@@ -558,7 +554,7 @@ public class TestBuilder {
   }
 
   public class SchemaTestBuilder extends TestBuilder {
-    private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
+    private final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
     SchemaTestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType,
         String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
       super(services, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
@@ -598,7 +594,7 @@ public class TestBuilder {
   public class JSONTestBuilder extends TestBuilder {
 
     // path to the baseline file that will be inserted into the validation query
-    private String baselineFilePath;
+    private final String baselineFilePath;
 
     JSONTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                     boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
@@ -627,8 +623,8 @@ public class TestBuilder {
     /**
      * Baseline query. Type of object depends on {@link #baselineQueryType}
      */
-    private Object baselineQuery;
-    private UserBitShared.QueryType baselineQueryType;
+    private final Object baselineQuery;
+    private final UserBitShared.QueryType baselineQueryType;
 
     BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, TestServices services,
                              Object query, UserBitShared.QueryType queryType, Boolean ordered,
diff --git a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
index 2ac5b8a..91afaed 100644
--- a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
+++ b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
@@ -46,7 +47,10 @@ public class LogicalPlanPersistence {
   }
 
   public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult, ObjectMapper mapper) {
-    this.mapper = mapper;
+    // The UI allows comments in JSON. Since we use the mapper
+    // here to serialize plugin configs to/from the pe
+    this.mapper = mapper
+        .configure(JsonParser.Feature.ALLOW_COMMENTS, true);
 
     SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
         .addDeserializer(LogicalExpression.class, new LogicalExpression.De(conf))
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 970b58e..2548c1c 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public abstract class StoragePluginConfig {
+
+  // DO NOT include enabled status in equality and hash
+  // comparisons; doing so will break the plugin registry.
   private Boolean enabled;
 
   /**