You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 07:44:51 UTC

[3/3] drill git commit: DRILL-4257: Fix StoragePluginRegistry clean-up behavior and misc clean up.

DRILL-4257: Fix StoragePluginRegistry clean-up behavior and misc clean up.

- Create the storage plugin registry using a Drill configuration parameter to be able to replace the registry implementation.
- Change StoragePluginRegistry into an interface and move the implementation to an impl class.
- Write documentation for StoragePluginRegistry.
- Make StoragePluginRegistry and StoragePluginMap AutoCloseable and ensure that Drillbit closes registry.
- Misc Drillbit code reorganization so that fields are at top of class (and static methods at bottom).
- Update DrillConfig to support reflection-based creation of storage plugin objects
- Remove final from DrillConfig so that application developers can extend config object.

This closes #321.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f964908a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f964908a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f964908a

Branch: refs/heads/master
Commit: f964908ae371b8321ce550a95add26d1c2c080a7
Parents: 67d5cc6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Jan 9 17:48:26 2016 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:42:22 2016 -0800

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |  36 +-
 .../org/apache/drill/exec/server/Drillbit.java  | 217 ++++----
 .../drill/exec/server/DrillbitContext.java      |  10 +-
 .../drill/exec/store/StoragePluginMap.java      |  20 +-
 .../drill/exec/store/StoragePluginRegistry.java | 500 +++----------------
 .../exec/store/StoragePluginRegistryImpl.java   | 497 ++++++++++++++++++
 .../src/main/resources/drill-module.conf        |   1 +
 .../java/org/apache/drill/PlanningBase.java     |   3 +-
 .../drill/exec/memory/TestAllocators.java       |   3 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   3 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |  17 +-
 11 files changed, 762 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 507c4f0..fa5dd29 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.config;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Constructor;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
@@ -27,17 +28,19 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.reflections.util.ClasspathHelper;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
 
-public final class DrillConfig extends NestedConfig{
+public class DrillConfig extends NestedConfig {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
 
   private final ImmutableList<String> startupArguments;
@@ -58,6 +61,36 @@ public final class DrillConfig extends NestedConfig{
     logger.debug("DrillConfig object initialized.");
   }
 
+  /**
+   * Get an instance of the provided interface using the configuration path provided. Construct the object based on the
+   * provided constructor arguments.
+   * @param path
+   *          The configuration path to use.
+   * @param iface
+   *          The Interface or Superclass of the instance you requested.
+   * @param constructorArgs
+   *          Any arguments required for constructing the requested type.
+   * @return The new Object instance that implements the provided Interface
+   */
+  @SuppressWarnings("unchecked")
+  public <T> T getInstance(String path, Class<T> iface, Object... constructorArgs) {
+    try{
+      String className = this.getString(path);
+      Class<?> clazz = Class.forName(className);
+      Preconditions.checkArgument(iface.isAssignableFrom(clazz));
+      Class<?>[] argClasses = new Class[constructorArgs.length];
+      for (int i = 0; i < constructorArgs.length; i++) {
+        argClasses[i] = constructorArgs[i].getClass();
+      }
+      Constructor<?> constructor = clazz.getConstructor(argClasses);
+      return (T) constructor.newInstance(constructorArgs);
+    }catch(Exception e){
+      throw UserException.unsupportedError(e)
+          .message("Failure while attempting to load instance of the class of type %s requested at path %s.",
+              iface.getName(), path).build(logger);
+    }
+  }
+
   public List<String> getStartupArguments() {
     return startupArguments;
   }
@@ -81,6 +114,7 @@ public final class DrillConfig extends NestedConfig{
     return create(null, false);
   }
 
+
   /**
    * <p>
    * DrillConfig loads up Drill configuration information. It does this utilizing a combination of classpath scanning

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 99523d6..9734a38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
 import org.apache.drill.exec.server.rest.WebServer;
 import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.CachingStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
@@ -51,117 +52,14 @@ import com.google.common.base.Stopwatch;
  */
 public class Drillbit implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
   static {
     Environment.logEnv("Drillbit environment: ", logger);
   }
 
-  private boolean isClosed = false;
-
-  public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
-    return start(DrillConfig.create(options.getConfigLocation()), null);
-  }
-
-  public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
-    return start(config, null);
-  }
-
-  public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
-      throws DrillbitStartupException {
-    logger.debug("Starting new Drillbit.");
-    // TODO: allow passing as a parameter
-    ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
-    Drillbit bit;
-    try {
-      bit = new Drillbit(config, remoteServiceSet, classpathScan);
-    } catch (final Exception ex) {
-      throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
-    }
-
-    try {
-      bit.run();
-    } catch (final Exception e) {
-      bit.close();
-      throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
-    }
-    logger.debug("Started new Drillbit.");
-    return bit;
-  }
-
   private final static String SYSTEM_OPTIONS_NAME = "org.apache.drill.exec.server.Drillbit.system_options";
 
-  private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
-    throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
-        + "\" " + errorMessage + ".");
-  }
-
-  private static String stripQuotes(final String s, final String systemProp) {
-    if (s.isEmpty()) {
-      return s;
-    }
-
-    final char cFirst = s.charAt(0);
-    final char cLast = s.charAt(s.length() - 1);
-    if ((cFirst == '"') || (cFirst == '\'')) {
-      if (cLast != cFirst) {
-        throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
-      }
-
-      return s.substring(1, s.length() - 2); // strip the quotes
-    }
-
-    if ((cLast == '"') || (cLast == '\'')) {
-        throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
-    }
-
-    // return as-is
-    return s;
-  }
-
-  private void javaPropertiesToSystemOptions() {
-    // get the system options property
-    final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
-    if ((allSystemProps == null) || allSystemProps.isEmpty()) {
-      return;
-    }
-
-    final OptionManager optionManager = getContext().getOptionManager();
-
-    // parse out the properties, validate, and then set them
-    final String systemProps[] = allSystemProps.split(",");
-    for(final String systemProp : systemProps) {
-      final String keyValue[] = systemProp.split("=");
-      if (keyValue.length != 2) {
-        throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
-      }
-
-      final String optionName = keyValue[0].trim();
-      if (optionName.isEmpty()) {
-        throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
-      }
-
-      final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
-      if (optionString.isEmpty()) {
-        throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
-      }
-
-      final OptionValue defaultValue = optionManager.getOption(optionName);
-      if (defaultValue == null) {
-        throwInvalidSystemOption(systemProp, "does not specify a valid option name");
-      }
-      if (defaultValue.type != OptionType.SYSTEM) {
-        throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
-      }
-
-      final OptionValue optionValue = OptionValue.createOption(
-          defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
-      optionManager.setOption(optionValue);
-    }
-  }
-
-  public static void main(final String[] cli) throws DrillbitStartupException {
-    final StartupOptions options = StartupOptions.parse(cli);
-    start(options);
-  }
+  private boolean isClosed = false;
 
   private final ClusterCoordinator coord;
   private final ServiceEngine engine;
@@ -170,6 +68,7 @@ public class Drillbit implements AutoCloseable {
   private final BootStrapContext context;
   private final WebServer webServer;
   private RegistrationHandle registrationHandle;
+  private volatile StoragePluginRegistry storageRegistry;
 
   @VisibleForTesting
   public Drillbit(
@@ -210,7 +109,8 @@ public class Drillbit implements AutoCloseable {
     final DrillbitEndpoint md = engine.start();
     manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
     final DrillbitContext drillbitContext = manager.getContext();
-    drillbitContext.getStorage().init();
+    storageRegistry = drillbitContext.getStorage();
+    storageRegistry.init();
     drillbitContext.getOptionManager().init();
     javaPropertiesToSystemOptions();
     registrationHandle = coord.register(md);
@@ -252,6 +152,7 @@ public class Drillbit implements AutoCloseable {
           storeProvider,
           coord,
           manager,
+          storageRegistry,
           context);
     } catch(Exception e) {
       logger.warn("Failure on close()", e);
@@ -261,6 +162,47 @@ public class Drillbit implements AutoCloseable {
     isClosed = true;
   }
 
+  private void javaPropertiesToSystemOptions() {
+    // get the system options property
+    final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
+    if ((allSystemProps == null) || allSystemProps.isEmpty()) {
+      return;
+    }
+
+    final OptionManager optionManager = getContext().getOptionManager();
+
+    // parse out the properties, validate, and then set them
+    final String systemProps[] = allSystemProps.split(",");
+    for (final String systemProp : systemProps) {
+      final String keyValue[] = systemProp.split("=");
+      if (keyValue.length != 2) {
+        throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
+      }
+
+      final String optionName = keyValue[0].trim();
+      if (optionName.isEmpty()) {
+        throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
+      }
+
+      final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
+      if (optionString.isEmpty()) {
+        throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
+      }
+
+      final OptionValue defaultValue = optionManager.getOption(optionName);
+      if (defaultValue == null) {
+        throwInvalidSystemOption(systemProp, "does not specify a valid option name");
+      }
+      if (defaultValue.type != OptionType.SYSTEM) {
+        throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
+      }
+
+      final OptionValue optionValue = OptionValue.createOption(
+          defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
+      optionManager.setOption(optionValue);
+    }
+  }
+
   /**
    * Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
    * occur during closure, as well as the location the drillbit was started from.
@@ -310,4 +252,67 @@ public class Drillbit implements AutoCloseable {
     return manager.getContext();
   }
 
+  public static void main(final String[] cli) throws DrillbitStartupException {
+    final StartupOptions options = StartupOptions.parse(cli);
+    start(options);
+  }
+
+  public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
+    return start(DrillConfig.create(options.getConfigLocation()), null);
+  }
+
+  public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
+    return start(config, null);
+  }
+
+  public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
+      throws DrillbitStartupException {
+    logger.debug("Starting new Drillbit.");
+    // TODO: allow passing as a parameter
+    ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
+    Drillbit bit;
+    try {
+      bit = new Drillbit(config, remoteServiceSet, classpathScan);
+    } catch (final Exception ex) {
+      throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+    }
+
+    try {
+      bit.run();
+    } catch (final Exception e) {
+      bit.close();
+      throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+    }
+    logger.debug("Started new Drillbit.");
+    return bit;
+  }
+
+  private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
+    throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
+        + "\" " + errorMessage + ".");
+  }
+
+  private static String stripQuotes(final String s, final String systemProp) {
+    if (s.isEmpty()) {
+      return s;
+    }
+
+    final char cFirst = s.charAt(0);
+    final char cLast = s.charAt(s.length() - 1);
+    if ((cFirst == '"') || (cFirst == '\'')) {
+      if (cLast != cFirst) {
+        throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
+      }
+
+      return s.substring(1, s.length() - 2); // strip the quotes
+    }
+
+    if ((cLast == '"') || (cLast == '\'')) {
+      throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
+    }
+
+    // return as-is
+    return s;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index c1f2e5b..aa6a0da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,8 +37,8 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 
 import com.codahale.metrics.MetricRegistry;
@@ -80,7 +80,11 @@ public class DrillbitContext {
     this.endpoint = checkNotNull(endpoint);
     this.provider = provider;
     this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
-    this.storagePlugins = new StoragePluginRegistry(this); // TODO change constructor
+
+    // TODO remove escaping "this".
+    this.storagePlugins = context.getConfig()
+        .getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this);
+
     this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
     this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
     this.systemOptions = new SystemOptionManager(lpPersistence, provider);
@@ -152,7 +156,7 @@ public class DrillbitContext {
     return provider;
   }
 
-  public DrillSchemaFactory getSchemaFactory() {
+  public SchemaFactory getSchemaFactory() {
     return storagePlugins.getSchemaFactory();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index f12d195..2f4a929 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.google.common.collect.LinkedListMultimap;
@@ -36,7 +37,7 @@ import com.google.common.collect.Multimaps;
  * This is inspired by ConcurrentMap but provides a secondary key mapping that allows an alternative lookup mechanism.
  * The class is responsible for internally managing consistency between the two maps. This class is threadsafe.
  */
-class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
+class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class);
 
   private final ConcurrentMap<String, StoragePlugin> nameMap = Maps.newConcurrentMap();
@@ -73,6 +74,18 @@ class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
     return ok;
   }
 
+  public void put(String name, StoragePlugin plugin) {
+    StoragePlugin oldPlugin = nameMap.put(name, plugin);
+    configMap.put(plugin.getConfig(), plugin);
+    if (oldPlugin != null) {
+      try {
+        oldPlugin.close();
+      } catch (Exception e) {
+        logger.warn("Failure while closing plugin replaced by injection.", e);
+      }
+    }
+  }
+
   public StoragePlugin putIfAbsent(String name, StoragePlugin plugin) {
     StoragePlugin oldPlugin = nameMap.putIfAbsent(name, plugin);
     if (oldPlugin == null) {
@@ -117,4 +130,9 @@ class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
     return nameMap.values();
   }
 
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(configMap.values());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 9d7dc57..c7d364b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -17,455 +17,107 @@
  */
 package org.apache.drill.exec.store;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.RuleSet;
-import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.scanner.ClassPathScanner;
-import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
-import org.apache.drill.exec.planner.logical.StoragePlugins;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
-import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
 import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.SystemTablePlugin;
-import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Stopwatch;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Resources;
-
-public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
-
-  public static final String SYS_PLUGIN = "sys";
-
-  public static final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
-
-  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
-  private final StoragePluginMap plugins = new StoragePluginMap();
-
-  private DrillbitContext context;
-  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
-  private final PStore<StoragePluginConfig> pluginSystemTable;
-  private final LogicalPlanPersistence lpPersistence;
-  private final ScanResult classpathScan;
-  private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
-
-  public StoragePluginRegistry(DrillbitContext context) {
-    this.context = checkNotNull(context);
-    this.lpPersistence = checkNotNull(context.getLpPersistence());
-    this.classpathScan = checkNotNull(context.getClasspathScan());
-    try {
-      this.pluginSystemTable = context //
-          .getPersistentStoreProvider() //
-          .getStore(PStoreConfig //
-              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
-              .name("sys.storage_plugins") //
-              .build());
-    } catch (IOException | RuntimeException e) {
-      logger.error("Failure while loading storage plugin registry.", e);
-      throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
-    }
-
-    ephemeralPlugins = CacheBuilder.newBuilder()
-        .expireAfterAccess(24, TimeUnit.HOURS)
-        .maximumSize(250)
-        .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() {
-          @Override
-          public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) {
-            closePlugin(notification.getValue());
-          }
-        })
-        .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
-          @Override
-          public StoragePlugin load(StoragePluginConfig config) throws Exception {
-            return create(null, config);
-          }
-        });
-  }
-
-  public PStore<StoragePluginConfig> getStore() {
-    return pluginSystemTable;
-  }
-
-  @SuppressWarnings("unchecked")
-  public void init() throws DrillbitStartupException {
-    final Collection<Class<? extends StoragePlugin>> pluginClasses =
-        classpathScan.getImplementations(StoragePlugin.class);
-    final String lineBrokenList =
-        pluginClasses.size() == 0
-        ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
-    logger.debug("Found {} storage plugin configuration classes: {}.",
-                 pluginClasses.size(), lineBrokenList);
-    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
-      int i = 0;
-      for (Constructor<?> c : plugin.getConstructors()) {
-        Class<?>[] params = c.getParameterTypes();
-        if(params.length != 3
-            || params[1] != DrillbitContext.class
-            || !StoragePluginConfig.class.isAssignableFrom(params[0])
-            || params[2] != String.class) {
-          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
-              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
-          continue;
-        }
-        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
-        i++;
-      }
-      if (i == 0) {
-        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
-            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
-      }
-    }
-
-    // create registered plugins defined in "storage-plugins.json"
-    this.plugins.putAll(createPlugins());
-
-  }
 
-  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
-    try {
-      /*
-       * Check if the storage plugins system table has any entries.  If not, load the boostrap-storage-plugin file into the system table.
-       */
-      if (!pluginSystemTable.iterator().hasNext()) {
-        // bootstrap load the config since no plugins are stored.
-        logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
-        Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
-        if (urls != null && ! urls.isEmpty()) {
-          logger.info("Loading the storage plugin configs from URLs {}.", urls);
-          Map<String, URL> pluginURLMap = Maps.newHashMap();
-          for (URL url :urls) {
-            String pluginsData = Resources.toString(url, Charsets.UTF_8);
-            StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
-            for (Map.Entry<String, StoragePluginConfig> config : plugins) {
-              if (!pluginSystemTable.putIfAbsent(config.getKey(), config.getValue())) {
-                logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
-                            config.getKey(), pluginURLMap.get(config.getKey()), url);
-                continue;
-              }
-              pluginURLMap.put(config.getKey(), url);
-            }
-          }
-        } else {
-          throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
-        }
-      }
+public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
+  final String SYS_PLUGIN = "sys";
+  final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
+  final String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
+  final String PSTORE_NAME = "sys.storage_plugins";
 
-      Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
-      for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
-        String name = entry.getKey();
-        StoragePluginConfig config = entry.getValue();
-        if (config.isEnabled()) {
-          try {
-            StoragePlugin plugin = create(name, config);
-            activePlugins.put(name, plugin);
-          } catch (ExecutionSetupException e) {
-            logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
-            config.setEnabled(false);
-            pluginSystemTable.put(name, config);
-          }
-        }
-      }
-
-      activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, INFORMATION_SCHEMA_PLUGIN));
-      activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
-
-      return activePlugins;
-    } catch (IOException e) {
-      logger.error("Failure setting up storage plugins.  Drillbit exiting.", e);
-      throw new IllegalStateException(e);
-    }
-  }
-
-  public void deletePlugin(String name) {
-    StoragePlugin plugin = plugins.remove(name);
-    closePlugin(plugin);
-    pluginSystemTable.delete(name);
-  }
-
-  private void closePlugin(StoragePlugin plugin) {
-    if (plugin == null) {
-      return;
-    }
-
-    try {
-      plugin.close();
-    } catch (Exception e) {
-      logger.warn("Exception while shutting down storage plugin.");
-    }
-  }
-
-  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException {
-    for (;;) {
-      final StoragePlugin oldPlugin = plugins.get(name);
-      final StoragePlugin newPlugin = create(name, config);
-      boolean done = false;
-      try {
-        if (oldPlugin != null) {
-          if (config.isEnabled()) {
-            done = plugins.replace(name, oldPlugin, newPlugin);
-          } else {
-            done = plugins.remove(name, oldPlugin);
-          }
-          if (done) {
-            closePlugin(oldPlugin);
-          }
-        } else if (config.isEnabled()) {
-          done = (null == plugins.putIfAbsent(name, newPlugin));
-        } else {
-          done = true;
-        }
-      } finally {
-        if (!done) {
-          closePlugin(newPlugin);
-        }
-      }
-
-      if (done) {
-        if (persist) {
-          pluginSystemTable.put(name, config);
-        }
-
-        return newPlugin;
-      }
-    }
-  }
+  /**
+   * Initialize the storage plugin registry. Must be called before the registry is used.
+   *
+   * @throws DrillbitStartupException
+   */
+  void init() throws DrillbitStartupException;
 
-  public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
-    StoragePlugin plugin = plugins.get(name);
-    if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
-      return plugin;
-    }
+  /**
+   * Delete a plugin by name
+   * @param name
+   *          The name of the storage plugin to delete.
+   */
+  void deletePlugin(String name);
 
-    // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
-    StoragePluginConfig config = this.pluginSystemTable.get(name);
-    if (config == null) {
-      if (plugin != null) {
-        plugins.remove(name);
-      }
-      return null;
-    } else {
-      if (plugin == null
-          || !plugin.getConfig().equals(config)
-          || plugin.getConfig().isEnabled() != config.isEnabled()) {
-        plugin = createOrUpdate(name, config, false);
-      }
-      return plugin;
-    }
-  }
+  /**
+   * Create a plugin by name and configuration. If the plugin already exists, update the plugin
+   * @param name
+   *          The name of the plugin
+   * @param config
+   *          The plugin confgiruation
+   * @param persist
+   *          Whether to persist the plugin for later use or treat it as ephemeral.
+   * @return The StoragePlugin instance.
+   * @throws ExecutionSetupException
+   */
+  StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException;
 
-  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
-    if (config instanceof NamedStoragePluginConfig) {
-      return getPlugin(((NamedStoragePluginConfig) config).name);
-    } else {
-      // try to lookup plugin by configuration
-      StoragePlugin plugin = plugins.get(config);
-      if (plugin != null) {
-        return plugin;
-      }
+  /**
+   * Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist.
+   * @param name
+   *          The name of the plugin
+   * @return The StoragePlugin instance.
+   * @throws ExecutionSetupException
+   */
+  StoragePlugin getPlugin(String name) throws ExecutionSetupException;
 
-      // no named plugin matches the desired configuration, let's create an
-      // ephemeral storage plugin (or get one from the cache)
-      try {
-        return ephemeralPlugins.get(config);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof ExecutionSetupException) {
-          throw (ExecutionSetupException) cause;
-        } else {
-          // this shouldn't happen. here for completeness.
-          throw new ExecutionSetupException("Failure while trying to create ephemeral plugin.", cause);
-        }
-      }
-    }
-  }
+  /**
+   * Get a plugin by configuration. If it doesn't exist, create it.
+   * @param config
+   *          The configuration for the plugin.
+   * @return The StoragePlugin instance.
+   * @throws ExecutionSetupException
+   */
+  StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException;
 
-  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
-    StoragePlugin p = getPlugin(storageConfig);
-    if (!(p instanceof FileSystemPlugin)) {
-      throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
-    }
-    FileSystemPlugin storage = (FileSystemPlugin) p;
-    return storage.getFormatPlugin(formatConfig);
-  }
+  /**
+   * Add a plugin to the registry using the provided name.
+   *
+   * @param name
+   * @param plugin
+   */
+  void addPlugin(String name, StoragePlugin plugin);
 
-  private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
-    StoragePlugin plugin = null;
-    Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
-    if (c == null) {
-      throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
-          pluginConfig));
-    }
-    try {
-      plugin = c.newInstance(pluginConfig, context, name);
-      plugin.start();
-      return plugin;
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
-        | IOException e) {
-      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
-      if (t instanceof ExecutionSetupException) {
-        throw ((ExecutionSetupException) t);
-      }
-      throw new ExecutionSetupException(String.format(
-          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
-    }
-  }
+  /**
+   * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config.
+   *
+   * @param storageConfig
+   *          The storage config for the associated FileSystemPlugin
+   * @param formatConfig
+   *          The format config for the associated FormatPlugin
+   * @return A FormatPlugin
+   * @throws ExecutionSetupException
+   */
+  FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
+      throws ExecutionSetupException;
 
-  @Override
-  public Iterator<Entry<String, StoragePlugin>> iterator() {
-    return plugins.iterator();
-  }
+  /**
+   * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
+   * @return PStore for StoragePlugin configuration objects.
+   */
+  PStore<StoragePluginConfig> getStore();
 
   /**
    * Return StoragePlugin rule sets.
+   *
    * @param optimizerRulesContext
    * @return Array of logical and physical rule sets.
    */
-  public RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext) {
-    // query registered engines for optimizer rules and build the storage plugin RuleSet
-    Builder<RelOptRule> logicalRulesBuilder = ImmutableSet.builder();
-    Builder<RelOptRule> physicalRulesBuilder = ImmutableSet.builder();
-    for (StoragePlugin plugin : this.plugins.plugins()) {
-      Set<? extends RelOptRule> rules = plugin.getLogicalOptimizerRules(optimizerRulesContext);
-      if (rules != null && rules.size() > 0) {
-        logicalRulesBuilder.addAll(rules);
-      }
-      rules = plugin.getPhysicalOptimizerRules(optimizerRulesContext);
-      if (rules != null && rules.size() > 0) {
-        physicalRulesBuilder.addAll(rules);
-      }
-    }
-
-    return new RuleSet[] {
-        DrillRuleSets.create(logicalRulesBuilder.build()),
-        DrillRuleSets.create(physicalRulesBuilder.build()) };
-  }
-
-  public DrillSchemaFactory getSchemaFactory() {
-    return schemaFactory;
-  }
-
-  public class DrillSchemaFactory implements SchemaFactory {
-
-    @Override
-    public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-      Stopwatch watch = new Stopwatch();
-      watch.start();
-
-      try {
-        Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
-        // iterate through the plugin instances in the persistence store adding
-        // any new ones and refreshing those whose configuration has changed
-        for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
-          if (config.getValue().isEnabled()) {
-            getPlugin(config.getKey());
-            currentPluginNames.remove(config.getKey());
-          }
-        }
-        // remove those which are no longer in the registry
-        for (String pluginName : currentPluginNames) {
-          if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
-            continue;
-          }
-          plugins.remove(pluginName);
-        }
-
-        // finally register schemas with the refreshed plugins
-        for (StoragePlugin plugin : plugins.plugins()) {
-          plugin.registerSchemas(schemaConfig, parent);
-        }
-      } catch (ExecutionSetupException e) {
-        throw new DrillRuntimeException("Failure while updating storage plugins", e);
-      }
-
-      // Add second level schema as top level schema with name qualified with parent schema name
-      // Ex: "dfs" schema has "default" and "tmp" as sub schemas. Add following extra schemas "dfs.default" and
-      // "dfs.tmp" under root schema.
-      //
-      // Before change, schema tree looks like below:
-      // "root"
-      //    -- "dfs"
-      //          -- "default"
-      //          -- "tmp"
-      //    -- "hive"
-      //          -- "default"
-      //          -- "hivedb1"
-      //
-      // After the change, the schema tree looks like below:
-      // "root"
-      //    -- "dfs"
-      //          -- "default"
-      //          -- "tmp"
-      //    -- "dfs.default"
-      //    -- "dfs.tmp"
-      //    -- "hive"
-      //          -- "default"
-      //          -- "hivedb1"
-      //    -- "hive.default"
-      //    -- "hive.hivedb1"
-      List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
-      for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
-        SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
-        for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
-          secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
-        }
-      }
-
-      for (SchemaPlus schema : secondLevelSchemas) {
-        AbstractSchema drillSchema;
-        try {
-          drillSchema = schema.unwrap(AbstractSchema.class);
-        } catch (ClassCastException e) {
-          throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
-        }
-        SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
-        parent.add(wrapper.getName(), wrapper);
-      }
-
-      logger.debug("Took {} ms to register schemas.", watch.elapsed(TimeUnit.MILLISECONDS));
-    }
-
-  }
+  RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext);
 
+  /**
+   * Get the Schema factory associated with this storage plugin registry.
+   * @return A SchemaFactory that can register the schemas associated with this plugin registry.
+   */
+  SchemaFactory getSchemaFactory();
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
new file mode 100644
index 0000000..a8628c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Resources;
+
+public class StoragePluginRegistryImpl implements StoragePluginRegistry {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
+
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
+  private final StoragePluginMap plugins = new StoragePluginMap();
+
+  private DrillbitContext context;
+  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+  private final PStore<StoragePluginConfig> pluginSystemTable;
+  private final LogicalPlanPersistence lpPersistence;
+  private final ScanResult classpathScan;
+  private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
+
+  public StoragePluginRegistryImpl(DrillbitContext context) {
+    this.context = checkNotNull(context);
+    this.lpPersistence = checkNotNull(context.getLpPersistence());
+    this.classpathScan = checkNotNull(context.getClasspathScan());
+    try {
+      this.pluginSystemTable = context //
+          .getPersistentStoreProvider() //
+          .getStore(PStoreConfig //
+              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
+              .name(PSTORE_NAME) //
+              .build());
+    } catch (IOException | RuntimeException e) {
+      logger.error("Failure while loading storage plugin registry.", e);
+      throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
+    }
+
+    ephemeralPlugins = CacheBuilder.newBuilder()
+        .expireAfterAccess(24, TimeUnit.HOURS)
+        .maximumSize(250)
+        .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() {
+          @Override
+          public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) {
+            closePlugin(notification.getValue());
+          }
+        })
+        .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
+          @Override
+          public StoragePlugin load(StoragePluginConfig config) throws Exception {
+            return create(null, config);
+          }
+        });
+  }
+
+  public PStore<StoragePluginConfig> getStore() {
+    return pluginSystemTable;
+  }
+
+  public void init() throws DrillbitStartupException {
+    availablePlugins = findAvailablePlugins(classpathScan);
+
+    // create registered plugins defined in "storage-plugins.json"
+    this.plugins.putAll(createPlugins());
+  }
+
+  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+    try {
+      /*
+       * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into
+       * the system table.
+       */
+      if (!pluginSystemTable.iterator().hasNext()) {
+        // bootstrap load the config since no plugins are stored.
+        logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
+        Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+        if (urls != null && !urls.isEmpty()) {
+          logger.info("Loading the storage plugin configs from URLs {}.", urls);
+          Map<String, URL> pluginURLMap = Maps.newHashMap();
+          for (URL url : urls) {
+            String pluginsData = Resources.toString(url, Charsets.UTF_8);
+            StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+            for (Map.Entry<String, StoragePluginConfig> config : plugins) {
+              if (!pluginSystemTable.putIfAbsent(config.getKey(), config.getValue())) {
+                logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
+                    config.getKey(), pluginURLMap.get(config.getKey()), url);
+                continue;
+              }
+              pluginURLMap.put(config.getKey(), url);
+            }
+          }
+        } else {
+          throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
+        }
+      }
+
+      Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+      for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
+        String name = entry.getKey();
+        StoragePluginConfig config = entry.getValue();
+        if (config.isEnabled()) {
+          try {
+            StoragePlugin plugin = create(name, config);
+            activePlugins.put(name, plugin);
+          } catch (ExecutionSetupException e) {
+            logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
+            config.setEnabled(false);
+            pluginSystemTable.put(name, config);
+          }
+        }
+      }
+
+      activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
+          INFORMATION_SCHEMA_PLUGIN));
+      activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
+
+      return activePlugins;
+    } catch (IOException e) {
+      logger.error("Failure setting up storage plugins.  Drillbit exiting.", e);
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public void addPlugin(String name, StoragePlugin plugin) {
+    plugins.put(name, plugin);
+  }
+
+  public void deletePlugin(String name) {
+    StoragePlugin plugin = plugins.remove(name);
+    closePlugin(plugin);
+    pluginSystemTable.delete(name);
+  }
+
+  private void closePlugin(StoragePlugin plugin) {
+    if (plugin == null) {
+      return;
+    }
+
+    try {
+      plugin.close();
+    } catch (Exception e) {
+      logger.warn("Exception while shutting down storage plugin.");
+    }
+  }
+
+  public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
+      throws ExecutionSetupException {
+    for (;;) {
+      final StoragePlugin oldPlugin = plugins.get(name);
+      final StoragePlugin newPlugin = create(name, config);
+      boolean done = false;
+      try {
+        if (oldPlugin != null) {
+          if (config.isEnabled()) {
+            done = plugins.replace(name, oldPlugin, newPlugin);
+          } else {
+            done = plugins.remove(name, oldPlugin);
+          }
+          if (done) {
+            closePlugin(oldPlugin);
+          }
+        } else if (config.isEnabled()) {
+          done = (null == plugins.putIfAbsent(name, newPlugin));
+        } else {
+          done = true;
+        }
+      } finally {
+        if (!done) {
+          closePlugin(newPlugin);
+        }
+      }
+
+      if (done) {
+        if (persist) {
+          pluginSystemTable.put(name, config);
+        }
+
+        return newPlugin;
+      }
+    }
+  }
+
+  public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
+    StoragePlugin plugin = plugins.get(name);
+    if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
+      return plugin;
+    }
+
+    // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
+    StoragePluginConfig config = this.pluginSystemTable.get(name);
+    if (config == null) {
+      if (plugin != null) {
+        plugins.remove(name);
+      }
+      return null;
+    } else {
+      if (plugin == null
+          || !plugin.getConfig().equals(config)
+          || plugin.getConfig().isEnabled() != config.isEnabled()) {
+        plugin = createOrUpdate(name, config, false);
+      }
+      return plugin;
+    }
+  }
+
+
+  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+    if (config instanceof NamedStoragePluginConfig) {
+      return getPlugin(((NamedStoragePluginConfig) config).name);
+    } else {
+      // try to lookup plugin by configuration
+      StoragePlugin plugin = plugins.get(config);
+      if (plugin != null) {
+        return plugin;
+      }
+
+      // no named plugin matches the desired configuration, let's create an
+      // ephemeral storage plugin (or get one from the cache)
+      try {
+        return ephemeralPlugins.get(config);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof ExecutionSetupException) {
+          throw (ExecutionSetupException) cause;
+        } else {
+          // this shouldn't happen. here for completeness.
+          throw new ExecutionSetupException("Failure while trying to create ephemeral plugin.", cause);
+        }
+      }
+    }
+  }
+
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
+      throws ExecutionSetupException {
+    StoragePlugin p = getPlugin(storageConfig);
+    if (!(p instanceof FileSystemPlugin)) {
+      throw new ExecutionSetupException(
+          String.format("You tried to request a format plugin for a storage plugin that wasn't of type "
+              + "FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
+    }
+    FileSystemPlugin storage = (FileSystemPlugin) p;
+    return storage.getFormatPlugin(formatConfig);
+  }
+
+  private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+    StoragePlugin plugin = null;
+    Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+    if (c == null) {
+      throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+          pluginConfig));
+    }
+    try {
+      plugin = c.newInstance(pluginConfig, context, name);
+      plugin.start();
+      return plugin;
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
+        | IOException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+      if (t instanceof ExecutionSetupException) {
+        throw ((ExecutionSetupException) t);
+      }
+      throw new ExecutionSetupException(String.format(
+          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return plugins.iterator();
+  }
+
+  /**
+   * Return StoragePlugin rule sets.
+   *
+   * @param optimizerRulesContext
+   * @return Array of logical and physical rule sets.
+   */
+  public RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext) {
+    // query registered engines for optimizer rules and build the storage plugin RuleSet
+    Builder<RelOptRule> logicalRulesBuilder = ImmutableSet.builder();
+    Builder<RelOptRule> physicalRulesBuilder = ImmutableSet.builder();
+    for (StoragePlugin plugin : this.plugins.plugins()) {
+      Set<? extends RelOptRule> rules = plugin.getLogicalOptimizerRules(optimizerRulesContext);
+      if (rules != null && rules.size() > 0) {
+        logicalRulesBuilder.addAll(rules);
+      }
+      rules = plugin.getPhysicalOptimizerRules(optimizerRulesContext);
+      if (rules != null && rules.size() > 0) {
+        physicalRulesBuilder.addAll(rules);
+      }
+    }
+
+    return new RuleSet[] {
+        DrillRuleSets.create(logicalRulesBuilder.build()),
+        DrillRuleSets.create(physicalRulesBuilder.build()) };
+  }
+
+  public SchemaFactory getSchemaFactory() {
+    return schemaFactory;
+  }
+
+  public class DrillSchemaFactory implements SchemaFactory {
+
+    @Override
+    public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+      Stopwatch watch = new Stopwatch();
+      watch.start();
+
+      try {
+        Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
+        // iterate through the plugin instances in the persistence store adding
+        // any new ones and refreshing those whose configuration has changed
+        for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
+          if (config.getValue().isEnabled()) {
+            getPlugin(config.getKey());
+            currentPluginNames.remove(config.getKey());
+          }
+        }
+        // remove those which are no longer in the registry
+        for (String pluginName : currentPluginNames) {
+          if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
+            continue;
+          }
+          plugins.remove(pluginName);
+        }
+
+        // finally register schemas with the refreshed plugins
+        for (StoragePlugin plugin : plugins.plugins()) {
+          plugin.registerSchemas(schemaConfig, parent);
+        }
+      } catch (ExecutionSetupException e) {
+        throw new DrillRuntimeException("Failure while updating storage plugins", e);
+      }
+
+      // Add second level schema as top level schema with name qualified with parent schema name
+      // Ex: "dfs" schema has "default" and "tmp" as sub schemas. Add following extra schemas "dfs.default" and
+      // "dfs.tmp" under root schema.
+      //
+      // Before change, schema tree looks like below:
+      // "root"
+      // -- "dfs"
+      // -- "default"
+      // -- "tmp"
+      // -- "hive"
+      // -- "default"
+      // -- "hivedb1"
+      //
+      // After the change, the schema tree looks like below:
+      // "root"
+      // -- "dfs"
+      // -- "default"
+      // -- "tmp"
+      // -- "dfs.default"
+      // -- "dfs.tmp"
+      // -- "hive"
+      // -- "default"
+      // -- "hivedb1"
+      // -- "hive.default"
+      // -- "hive.hivedb1"
+      List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
+      for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
+        SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
+        for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
+          secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
+        }
+      }
+
+      for (SchemaPlus schema : secondLevelSchemas) {
+        AbstractSchema drillSchema;
+        try {
+          drillSchema = schema.unwrap(AbstractSchema.class);
+        } catch (ClassCastException e) {
+          throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
+        }
+        SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
+        parent.add(wrapper.getName(), wrapper);
+      }
+
+      logger.debug("Took {} ms to register schemas.", watch.elapsed(TimeUnit.MILLISECONDS));
+    }
+
+  }
+
+  @Override
+  public synchronized void close() throws Exception {
+    ephemeralPlugins.invalidateAll();
+    plugins.close();
+  }
+
+  /**
+   * Get a list of all available storage plugin class constructors.
+   * @param classpathScan
+   *          A classpath scan to use.
+   * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
+   */
+  @SuppressWarnings("unchecked")
+  public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
+    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+    final Collection<Class<? extends StoragePlugin>> pluginClasses =
+        classpathScan.getImplementations(StoragePlugin.class);
+    final String lineBrokenList =
+        pluginClasses.size() == 0
+            ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
+    logger.debug("Found {} storage plugin configuration classes: {}.",
+        pluginClasses.size(), lineBrokenList);
+    for (Class<? extends StoragePlugin> plugin : pluginClasses) {
+      int i = 0;
+      for (Constructor<?> c : plugin.getConstructors()) {
+        Class<?>[] params = c.getParameterTypes();
+        if (params.length != 3
+            || params[1] != DrillbitContext.class
+            || !StoragePluginConfig.class.isAssignableFrom(params[0])
+            || params[2] != String.class) {
+          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
+              + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+          continue;
+        }
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if (i == 0) {
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
+            + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+      }
+    }
+    return availablePlugins;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6b5d9fe..6901a5f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -75,6 +75,7 @@ drill.exec: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
   },
   storage: {
+    registry: "org.apache.drill.exec.store.StoragePluginRegistryImpl",
     file: {
       text: {
         buffer.size: 262144,

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 79245b2..24c8c63 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.junit.Rule;
@@ -105,7 +106,7 @@ public class PlanningBase extends ExecTest{
       }
     };
 
-    final StoragePluginRegistry registry = new StoragePluginRegistry(dbContext);
+    final StoragePluginRegistry registry = new StoragePluginRegistryImpl(dbContext);
     registry.init();
     final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
     final DrillOperatorTable table = new DrillOperatorTable(functionRegistry);

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 322e54a..288e78d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.test.DrillTest;
@@ -182,7 +183,7 @@ public class TestAllocators extends DrillTest {
       bit.run();
       final DrillbitContext bitContext = bit.getContext();
       FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
-      StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
+      StoragePluginRegistry storageRegistry = new StoragePluginRegistryImpl(bitContext);
 
       // Create a few Fragment Contexts
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 7207bf2..ef102b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -54,6 +54,7 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
@@ -314,7 +315,7 @@ public class TestOptiqPlans extends ExecTest {
       }
     };
 
-    final StoragePluginRegistry reg = new StoragePluginRegistry(bitContext);
+    final StoragePluginRegistry reg = new StoragePluginRegistryImpl(bitContext);
 
     final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(config, reg);
     final PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 3b363bd..a533620 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.util.FileUtils;
@@ -46,7 +49,7 @@ import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,9 +59,6 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
-import mockit.Injectable;
-import mockit.NonStrictExpectations;
-
 
 public class TestMergeJoin extends PopUnitTestBase {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
@@ -130,7 +130,8 @@ public class TestMergeJoin extends PopUnitTestBase {
       bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
     }};
 
-    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+        new StoragePluginRegistryImpl(bitContext));
     final PhysicalPlan plan = reader.readPhysicalPlan(
         Files.toString(
             FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)
@@ -186,7 +187,8 @@ public class TestMergeJoin extends PopUnitTestBase {
       bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
     }};
 
-    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+        new StoragePluginRegistryImpl(bitContext));
     final PhysicalPlan plan = reader.readPhysicalPlan(
         Files.toString(
             FileUtils.getResourceAsFile("/join/merge_inner_single_batch.json"), Charsets.UTF_8)
@@ -242,7 +244,8 @@ public class TestMergeJoin extends PopUnitTestBase {
       bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
     }};
 
-    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+        new StoragePluginRegistryImpl(bitContext));
     final PhysicalPlan plan = reader.readPhysicalPlan(
         Files.toString(
             FileUtils.getResourceAsFile("/join/merge_multi_batch.json"), Charsets.UTF_8)