You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 07:44:51 UTC
[3/3] drill git commit: DRILL-4257: Fix StoragePluginRegistry
clean-up behavior and misc clean up.
DRILL-4257: Fix StoragePluginRegistry clean-up behavior and misc clean up.
- Create the storage plugin registry using a Drill configuration parameter to be able to replace the registry implementation.
- Change StoragePluginRegistry into an interface and move the implementation to an impl class.
- Write documentation for StoragePluginRegistry.
- Make StoragePluginRegistry and StoragePluginMap AutoCloseable and ensure that Drillbit closes registry.
- Misc Drillbit code reorganization so that fields are at top of class (and static methods at bottom).
- Update DrillConfig to support reflection-based creation of storage plugin objects
- Remove final from DrillConfig so that application developers can extend config object.
This closes #321.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f964908a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f964908a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f964908a
Branch: refs/heads/master
Commit: f964908ae371b8321ce550a95add26d1c2c080a7
Parents: 67d5cc6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Jan 9 17:48:26 2016 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:42:22 2016 -0800
----------------------------------------------------------------------
.../apache/drill/common/config/DrillConfig.java | 36 +-
.../org/apache/drill/exec/server/Drillbit.java | 217 ++++----
.../drill/exec/server/DrillbitContext.java | 10 +-
.../drill/exec/store/StoragePluginMap.java | 20 +-
.../drill/exec/store/StoragePluginRegistry.java | 500 +++----------------
.../exec/store/StoragePluginRegistryImpl.java | 497 ++++++++++++++++++
.../src/main/resources/drill-module.conf | 1 +
.../java/org/apache/drill/PlanningBase.java | 3 +-
.../drill/exec/memory/TestAllocators.java | 3 +-
.../exec/physical/impl/TestOptiqPlans.java | 3 +-
.../exec/physical/impl/join/TestMergeJoin.java | 17 +-
11 files changed, 762 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 507c4f0..fa5dd29 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.config;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Constructor;
import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -27,17 +28,19 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.reflections.util.ClasspathHelper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
-public final class DrillConfig extends NestedConfig{
+public class DrillConfig extends NestedConfig {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
private final ImmutableList<String> startupArguments;
@@ -58,6 +61,36 @@ public final class DrillConfig extends NestedConfig{
logger.debug("DrillConfig object initialized.");
}
+ /**
+ * Get an instance of the provided interface using the configuration path provided. Construct the object based on the
+ * provided constructor arguments.
+ * @param path
+ * The configuration path to use.
+ * @param iface
+ * The Interface or Superclass of the instance you requested.
+ * @param constructorArgs
+ * Any arguments required for constructing the requested type.
+ * @return The new Object instance that implements the provided Interface
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getInstance(String path, Class<T> iface, Object... constructorArgs) {
+ try{
+ String className = this.getString(path);
+ Class<?> clazz = Class.forName(className);
+ Preconditions.checkArgument(iface.isAssignableFrom(clazz));
+ Class<?>[] argClasses = new Class[constructorArgs.length];
+ for (int i = 0; i < constructorArgs.length; i++) {
+ argClasses[i] = constructorArgs[i].getClass();
+ }
+ Constructor<?> constructor = clazz.getConstructor(argClasses);
+ return (T) constructor.newInstance(constructorArgs);
+ }catch(Exception e){
+ throw UserException.unsupportedError(e)
+ .message("Failure while attempting to load instance of the class of type %s requested at path %s.",
+ iface.getName(), path).build(logger);
+ }
+ }
+
public List<String> getStartupArguments() {
return startupArguments;
}
@@ -81,6 +114,7 @@ public final class DrillConfig extends NestedConfig{
return create(null, false);
}
+
/**
* <p>
* DrillConfig loads up Drill configuration information. It does this utilizing a combination of classpath scanning
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 99523d6..9734a38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.CachingStoreProvider;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
@@ -51,117 +52,14 @@ import com.google.common.base.Stopwatch;
*/
public class Drillbit implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
static {
Environment.logEnv("Drillbit environment: ", logger);
}
- private boolean isClosed = false;
-
- public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
- return start(DrillConfig.create(options.getConfigLocation()), null);
- }
-
- public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
- return start(config, null);
- }
-
- public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
- throws DrillbitStartupException {
- logger.debug("Starting new Drillbit.");
- // TODO: allow passing as a parameter
- ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
- Drillbit bit;
- try {
- bit = new Drillbit(config, remoteServiceSet, classpathScan);
- } catch (final Exception ex) {
- throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
- }
-
- try {
- bit.run();
- } catch (final Exception e) {
- bit.close();
- throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
- }
- logger.debug("Started new Drillbit.");
- return bit;
- }
-
private final static String SYSTEM_OPTIONS_NAME = "org.apache.drill.exec.server.Drillbit.system_options";
- private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
- throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
- + "\" " + errorMessage + ".");
- }
-
- private static String stripQuotes(final String s, final String systemProp) {
- if (s.isEmpty()) {
- return s;
- }
-
- final char cFirst = s.charAt(0);
- final char cLast = s.charAt(s.length() - 1);
- if ((cFirst == '"') || (cFirst == '\'')) {
- if (cLast != cFirst) {
- throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
- }
-
- return s.substring(1, s.length() - 2); // strip the quotes
- }
-
- if ((cLast == '"') || (cLast == '\'')) {
- throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
- }
-
- // return as-is
- return s;
- }
-
- private void javaPropertiesToSystemOptions() {
- // get the system options property
- final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
- if ((allSystemProps == null) || allSystemProps.isEmpty()) {
- return;
- }
-
- final OptionManager optionManager = getContext().getOptionManager();
-
- // parse out the properties, validate, and then set them
- final String systemProps[] = allSystemProps.split(",");
- for(final String systemProp : systemProps) {
- final String keyValue[] = systemProp.split("=");
- if (keyValue.length != 2) {
- throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
- }
-
- final String optionName = keyValue[0].trim();
- if (optionName.isEmpty()) {
- throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
- }
-
- final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
- if (optionString.isEmpty()) {
- throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
- }
-
- final OptionValue defaultValue = optionManager.getOption(optionName);
- if (defaultValue == null) {
- throwInvalidSystemOption(systemProp, "does not specify a valid option name");
- }
- if (defaultValue.type != OptionType.SYSTEM) {
- throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
- }
-
- final OptionValue optionValue = OptionValue.createOption(
- defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
- optionManager.setOption(optionValue);
- }
- }
-
- public static void main(final String[] cli) throws DrillbitStartupException {
- final StartupOptions options = StartupOptions.parse(cli);
- start(options);
- }
+ private boolean isClosed = false;
private final ClusterCoordinator coord;
private final ServiceEngine engine;
@@ -170,6 +68,7 @@ public class Drillbit implements AutoCloseable {
private final BootStrapContext context;
private final WebServer webServer;
private RegistrationHandle registrationHandle;
+ private volatile StoragePluginRegistry storageRegistry;
@VisibleForTesting
public Drillbit(
@@ -210,7 +109,8 @@ public class Drillbit implements AutoCloseable {
final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
final DrillbitContext drillbitContext = manager.getContext();
- drillbitContext.getStorage().init();
+ storageRegistry = drillbitContext.getStorage();
+ storageRegistry.init();
drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
registrationHandle = coord.register(md);
@@ -252,6 +152,7 @@ public class Drillbit implements AutoCloseable {
storeProvider,
coord,
manager,
+ storageRegistry,
context);
} catch(Exception e) {
logger.warn("Failure on close()", e);
@@ -261,6 +162,47 @@ public class Drillbit implements AutoCloseable {
isClosed = true;
}
+ private void javaPropertiesToSystemOptions() {
+ // get the system options property
+ final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
+ if ((allSystemProps == null) || allSystemProps.isEmpty()) {
+ return;
+ }
+
+ final OptionManager optionManager = getContext().getOptionManager();
+
+ // parse out the properties, validate, and then set them
+ final String systemProps[] = allSystemProps.split(",");
+ for (final String systemProp : systemProps) {
+ final String keyValue[] = systemProp.split("=");
+ if (keyValue.length != 2) {
+ throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
+ }
+
+ final String optionName = keyValue[0].trim();
+ if (optionName.isEmpty()) {
+ throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
+ }
+
+ final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
+ if (optionString.isEmpty()) {
+ throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
+ }
+
+ final OptionValue defaultValue = optionManager.getOption(optionName);
+ if (defaultValue == null) {
+ throwInvalidSystemOption(systemProp, "does not specify a valid option name");
+ }
+ if (defaultValue.type != OptionType.SYSTEM) {
+ throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
+ }
+
+ final OptionValue optionValue = OptionValue.createOption(
+ defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
+ optionManager.setOption(optionValue);
+ }
+ }
+
/**
* Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
* occur during closure, as well as the location the drillbit was started from.
@@ -310,4 +252,67 @@ public class Drillbit implements AutoCloseable {
return manager.getContext();
}
+ public static void main(final String[] cli) throws DrillbitStartupException {
+ final StartupOptions options = StartupOptions.parse(cli);
+ start(options);
+ }
+
+ public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
+ return start(DrillConfig.create(options.getConfigLocation()), null);
+ }
+
+ public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
+ return start(config, null);
+ }
+
+ public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
+ throws DrillbitStartupException {
+ logger.debug("Starting new Drillbit.");
+ // TODO: allow passing as a parameter
+ ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
+ Drillbit bit;
+ try {
+ bit = new Drillbit(config, remoteServiceSet, classpathScan);
+ } catch (final Exception ex) {
+ throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+ }
+
+ try {
+ bit.run();
+ } catch (final Exception e) {
+ bit.close();
+ throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+ }
+ logger.debug("Started new Drillbit.");
+ return bit;
+ }
+
+ private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
+ throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
+ + "\" " + errorMessage + ".");
+ }
+
+ private static String stripQuotes(final String s, final String systemProp) {
+ if (s.isEmpty()) {
+ return s;
+ }
+
+ final char cFirst = s.charAt(0);
+ final char cLast = s.charAt(s.length() - 1);
+ if ((cFirst == '"') || (cFirst == '\'')) {
+ if (cLast != cFirst) {
+ throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
+ }
+
+ return s.substring(1, s.length() - 2); // strip the quotes
+ }
+
+ if ((cLast == '"') || (cLast == '\'')) {
+ throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
+ }
+
+ // return as-is
+ return s;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index c1f2e5b..aa6a0da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,8 +37,8 @@ import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
import org.apache.drill.exec.store.sys.PStoreProvider;
import com.codahale.metrics.MetricRegistry;
@@ -80,7 +80,11 @@ public class DrillbitContext {
this.endpoint = checkNotNull(endpoint);
this.provider = provider;
this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
- this.storagePlugins = new StoragePluginRegistry(this); // TODO change constructor
+
+ // TODO remove escaping "this".
+ this.storagePlugins = context.getConfig()
+ .getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this);
+
this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
this.systemOptions = new SystemOptionManager(lpPersistence, provider);
@@ -152,7 +156,7 @@ public class DrillbitContext {
return provider;
}
- public DrillSchemaFactory getSchemaFactory() {
+ public SchemaFactory getSchemaFactory() {
return storagePlugins.getSchemaFactory();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index f12d195..2f4a929 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.logical.StoragePluginConfig;
import com.google.common.collect.LinkedListMultimap;
@@ -36,7 +37,7 @@ import com.google.common.collect.Multimaps;
* This is inspired by ConcurrentMap but provides a secondary key mapping that allows an alternative lookup mechanism.
* The class is responsible for internally managing consistency between the two maps. This class is threadsafe.
*/
-class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
+class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class);
private final ConcurrentMap<String, StoragePlugin> nameMap = Maps.newConcurrentMap();
@@ -73,6 +74,18 @@ class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
return ok;
}
+ public void put(String name, StoragePlugin plugin) {
+ StoragePlugin oldPlugin = nameMap.put(name, plugin);
+ configMap.put(plugin.getConfig(), plugin);
+ if (oldPlugin != null) {
+ try {
+ oldPlugin.close();
+ } catch (Exception e) {
+ logger.warn("Failure while closing plugin replaced by injection.", e);
+ }
+ }
+ }
+
public StoragePlugin putIfAbsent(String name, StoragePlugin plugin) {
StoragePlugin oldPlugin = nameMap.putIfAbsent(name, plugin);
if (oldPlugin == null) {
@@ -117,4 +130,9 @@ class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>> {
return nameMap.values();
}
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(configMap.values());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 9d7dc57..c7d364b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -17,455 +17,107 @@
*/
package org.apache.drill.exec.store;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RuleSet;
-import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.scanner.ClassPathScanner;
-import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
-import org.apache.drill.exec.planner.logical.StoragePlugins;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
-import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.SystemTablePlugin;
-import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Stopwatch;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Resources;
-
-public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
-
- public static final String SYS_PLUGIN = "sys";
-
- public static final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
-
- private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
- private final StoragePluginMap plugins = new StoragePluginMap();
-
- private DrillbitContext context;
- private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
- private final PStore<StoragePluginConfig> pluginSystemTable;
- private final LogicalPlanPersistence lpPersistence;
- private final ScanResult classpathScan;
- private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
-
- public StoragePluginRegistry(DrillbitContext context) {
- this.context = checkNotNull(context);
- this.lpPersistence = checkNotNull(context.getLpPersistence());
- this.classpathScan = checkNotNull(context.getClasspathScan());
- try {
- this.pluginSystemTable = context //
- .getPersistentStoreProvider() //
- .getStore(PStoreConfig //
- .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
- .name("sys.storage_plugins") //
- .build());
- } catch (IOException | RuntimeException e) {
- logger.error("Failure while loading storage plugin registry.", e);
- throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
- }
-
- ephemeralPlugins = CacheBuilder.newBuilder()
- .expireAfterAccess(24, TimeUnit.HOURS)
- .maximumSize(250)
- .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() {
- @Override
- public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) {
- closePlugin(notification.getValue());
- }
- })
- .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
- @Override
- public StoragePlugin load(StoragePluginConfig config) throws Exception {
- return create(null, config);
- }
- });
- }
-
- public PStore<StoragePluginConfig> getStore() {
- return pluginSystemTable;
- }
-
- @SuppressWarnings("unchecked")
- public void init() throws DrillbitStartupException {
- final Collection<Class<? extends StoragePlugin>> pluginClasses =
- classpathScan.getImplementations(StoragePlugin.class);
- final String lineBrokenList =
- pluginClasses.size() == 0
- ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
- logger.debug("Found {} storage plugin configuration classes: {}.",
- pluginClasses.size(), lineBrokenList);
- for (Class<? extends StoragePlugin> plugin : pluginClasses) {
- int i = 0;
- for (Constructor<?> c : plugin.getConstructors()) {
- Class<?>[] params = c.getParameterTypes();
- if(params.length != 3
- || params[1] != DrillbitContext.class
- || !StoragePluginConfig.class.isAssignableFrom(params[0])
- || params[2] != String.class) {
- logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
- + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
- continue;
- }
- availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
- i++;
- }
- if (i == 0) {
- logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
- + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
- }
- }
-
- // create registered plugins defined in "storage-plugins.json"
- this.plugins.putAll(createPlugins());
-
- }
- private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
- try {
- /*
- * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into the system table.
- */
- if (!pluginSystemTable.iterator().hasNext()) {
- // bootstrap load the config since no plugins are stored.
- logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
- Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
- if (urls != null && ! urls.isEmpty()) {
- logger.info("Loading the storage plugin configs from URLs {}.", urls);
- Map<String, URL> pluginURLMap = Maps.newHashMap();
- for (URL url :urls) {
- String pluginsData = Resources.toString(url, Charsets.UTF_8);
- StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
- for (Map.Entry<String, StoragePluginConfig> config : plugins) {
- if (!pluginSystemTable.putIfAbsent(config.getKey(), config.getValue())) {
- logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
- config.getKey(), pluginURLMap.get(config.getKey()), url);
- continue;
- }
- pluginURLMap.put(config.getKey(), url);
- }
- }
- } else {
- throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
- }
- }
+public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
+ final String SYS_PLUGIN = "sys";
+ final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
+ final String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
+ final String PSTORE_NAME = "sys.storage_plugins";
- Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
- for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
- String name = entry.getKey();
- StoragePluginConfig config = entry.getValue();
- if (config.isEnabled()) {
- try {
- StoragePlugin plugin = create(name, config);
- activePlugins.put(name, plugin);
- } catch (ExecutionSetupException e) {
- logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
- config.setEnabled(false);
- pluginSystemTable.put(name, config);
- }
- }
- }
-
- activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, INFORMATION_SCHEMA_PLUGIN));
- activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
-
- return activePlugins;
- } catch (IOException e) {
- logger.error("Failure setting up storage plugins. Drillbit exiting.", e);
- throw new IllegalStateException(e);
- }
- }
-
- public void deletePlugin(String name) {
- StoragePlugin plugin = plugins.remove(name);
- closePlugin(plugin);
- pluginSystemTable.delete(name);
- }
-
- private void closePlugin(StoragePlugin plugin) {
- if (plugin == null) {
- return;
- }
-
- try {
- plugin.close();
- } catch (Exception e) {
- logger.warn("Exception while shutting down storage plugin.");
- }
- }
-
- public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException {
- for (;;) {
- final StoragePlugin oldPlugin = plugins.get(name);
- final StoragePlugin newPlugin = create(name, config);
- boolean done = false;
- try {
- if (oldPlugin != null) {
- if (config.isEnabled()) {
- done = plugins.replace(name, oldPlugin, newPlugin);
- } else {
- done = plugins.remove(name, oldPlugin);
- }
- if (done) {
- closePlugin(oldPlugin);
- }
- } else if (config.isEnabled()) {
- done = (null == plugins.putIfAbsent(name, newPlugin));
- } else {
- done = true;
- }
- } finally {
- if (!done) {
- closePlugin(newPlugin);
- }
- }
-
- if (done) {
- if (persist) {
- pluginSystemTable.put(name, config);
- }
-
- return newPlugin;
- }
- }
- }
+ /**
+ * Initialize the storage plugin registry. Must be called before the registry is used.
+ *
+ * @throws DrillbitStartupException
+ */
+ void init() throws DrillbitStartupException;
- public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
- StoragePlugin plugin = plugins.get(name);
- if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
- return plugin;
- }
+ /**
+ * Delete a plugin by name
+ * @param name
+ * The name of the storage plugin to delete.
+ */
+ void deletePlugin(String name);
- // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
- StoragePluginConfig config = this.pluginSystemTable.get(name);
- if (config == null) {
- if (plugin != null) {
- plugins.remove(name);
- }
- return null;
- } else {
- if (plugin == null
- || !plugin.getConfig().equals(config)
- || plugin.getConfig().isEnabled() != config.isEnabled()) {
- plugin = createOrUpdate(name, config, false);
- }
- return plugin;
- }
- }
+ /**
+ * Create a plugin by name and configuration. If the plugin already exists, update the plugin
+ * @param name
+ * The name of the plugin
+ * @param config
+ * The plugin confgiruation
+ * @param persist
+ * Whether to persist the plugin for later use or treat it as ephemeral.
+ * @return The StoragePlugin instance.
+ * @throws ExecutionSetupException
+ */
+ StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException;
- public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
- if (config instanceof NamedStoragePluginConfig) {
- return getPlugin(((NamedStoragePluginConfig) config).name);
- } else {
- // try to lookup plugin by configuration
- StoragePlugin plugin = plugins.get(config);
- if (plugin != null) {
- return plugin;
- }
+ /**
+ * Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist.
+ * @param name
+ * The name of the plugin
+ * @return The StoragePlugin instance.
+ * @throws ExecutionSetupException
+ */
+ StoragePlugin getPlugin(String name) throws ExecutionSetupException;
- // no named plugin matches the desired configuration, let's create an
- // ephemeral storage plugin (or get one from the cache)
- try {
- return ephemeralPlugins.get(config);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof ExecutionSetupException) {
- throw (ExecutionSetupException) cause;
- } else {
- // this shouldn't happen. here for completeness.
- throw new ExecutionSetupException("Failure while trying to create ephemeral plugin.", cause);
- }
- }
- }
- }
+ /**
+ * Get a plugin by configuration. If it doesn't exist, create it.
+ * @param config
+ * The configuration for the plugin.
+ * @return The StoragePlugin instance.
+ * @throws ExecutionSetupException
+ */
+ StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException;
- public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
- StoragePlugin p = getPlugin(storageConfig);
- if (!(p instanceof FileSystemPlugin)) {
- throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
- }
- FileSystemPlugin storage = (FileSystemPlugin) p;
- return storage.getFormatPlugin(formatConfig);
- }
+ /**
+ * Add a plugin to the registry using the provided name.
+ *
+ * @param name
+ * @param plugin
+ */
+ void addPlugin(String name, StoragePlugin plugin);
- private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
- StoragePlugin plugin = null;
- Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
- if (c == null) {
- throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
- pluginConfig));
- }
- try {
- plugin = c.newInstance(pluginConfig, context, name);
- plugin.start();
- return plugin;
- } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
- | IOException e) {
- Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
- if (t instanceof ExecutionSetupException) {
- throw ((ExecutionSetupException) t);
- }
- throw new ExecutionSetupException(String.format(
- "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
- }
- }
+ /**
+ * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config.
+ *
+ * @param storageConfig
+ * The storage config for the associated FileSystemPlugin
+ * @param formatConfig
+ * The format config for the associated FormatPlugin
+ * @return A FormatPlugin
+ * @throws ExecutionSetupException
+ */
+ FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
+ throws ExecutionSetupException;
- @Override
- public Iterator<Entry<String, StoragePlugin>> iterator() {
- return plugins.iterator();
- }
+ /**
+ * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
+ * @return PStore for StoragePlugin configuration objects.
+ */
+ PStore<StoragePluginConfig> getStore();
/**
* Return StoragePlugin rule sets.
+ *
* @param optimizerRulesContext
* @return Array of logical and physical rule sets.
*/
- public RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext) {
- // query registered engines for optimizer rules and build the storage plugin RuleSet
- Builder<RelOptRule> logicalRulesBuilder = ImmutableSet.builder();
- Builder<RelOptRule> physicalRulesBuilder = ImmutableSet.builder();
- for (StoragePlugin plugin : this.plugins.plugins()) {
- Set<? extends RelOptRule> rules = plugin.getLogicalOptimizerRules(optimizerRulesContext);
- if (rules != null && rules.size() > 0) {
- logicalRulesBuilder.addAll(rules);
- }
- rules = plugin.getPhysicalOptimizerRules(optimizerRulesContext);
- if (rules != null && rules.size() > 0) {
- physicalRulesBuilder.addAll(rules);
- }
- }
-
- return new RuleSet[] {
- DrillRuleSets.create(logicalRulesBuilder.build()),
- DrillRuleSets.create(physicalRulesBuilder.build()) };
- }
-
- public DrillSchemaFactory getSchemaFactory() {
- return schemaFactory;
- }
-
- public class DrillSchemaFactory implements SchemaFactory {
-
- @Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
- Stopwatch watch = new Stopwatch();
- watch.start();
-
- try {
- Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
- // iterate through the plugin instances in the persistence store adding
- // any new ones and refreshing those whose configuration has changed
- for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
- if (config.getValue().isEnabled()) {
- getPlugin(config.getKey());
- currentPluginNames.remove(config.getKey());
- }
- }
- // remove those which are no longer in the registry
- for (String pluginName : currentPluginNames) {
- if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
- continue;
- }
- plugins.remove(pluginName);
- }
-
- // finally register schemas with the refreshed plugins
- for (StoragePlugin plugin : plugins.plugins()) {
- plugin.registerSchemas(schemaConfig, parent);
- }
- } catch (ExecutionSetupException e) {
- throw new DrillRuntimeException("Failure while updating storage plugins", e);
- }
-
- // Add second level schema as top level schema with name qualified with parent schema name
- // Ex: "dfs" schema has "default" and "tmp" as sub schemas. Add following extra schemas "dfs.default" and
- // "dfs.tmp" under root schema.
- //
- // Before change, schema tree looks like below:
- // "root"
- // -- "dfs"
- // -- "default"
- // -- "tmp"
- // -- "hive"
- // -- "default"
- // -- "hivedb1"
- //
- // After the change, the schema tree looks like below:
- // "root"
- // -- "dfs"
- // -- "default"
- // -- "tmp"
- // -- "dfs.default"
- // -- "dfs.tmp"
- // -- "hive"
- // -- "default"
- // -- "hivedb1"
- // -- "hive.default"
- // -- "hive.hivedb1"
- List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
- for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
- SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
- for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
- secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
- }
- }
-
- for (SchemaPlus schema : secondLevelSchemas) {
- AbstractSchema drillSchema;
- try {
- drillSchema = schema.unwrap(AbstractSchema.class);
- } catch (ClassCastException e) {
- throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
- }
- SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
- parent.add(wrapper.getName(), wrapper);
- }
-
- logger.debug("Took {} ms to register schemas.", watch.elapsed(TimeUnit.MILLISECONDS));
- }
-
- }
+ RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext);
+ /**
+ * Get the Schema factory associated with this storage plugin registry.
+ * @return A SchemaFactory that can register the schemas associated with this plugin registry.
+ */
+ SchemaFactory getSchemaFactory();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
new file mode 100644
index 0000000..a8628c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Resources;
+
+public class StoragePluginRegistryImpl implements StoragePluginRegistry {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
+
+ private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
+ private final StoragePluginMap plugins = new StoragePluginMap();
+
+ private DrillbitContext context;
+ private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+ private final PStore<StoragePluginConfig> pluginSystemTable;
+ private final LogicalPlanPersistence lpPersistence;
+ private final ScanResult classpathScan;
+ private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
+
+ public StoragePluginRegistryImpl(DrillbitContext context) {
+ this.context = checkNotNull(context);
+ this.lpPersistence = checkNotNull(context.getLpPersistence());
+ this.classpathScan = checkNotNull(context.getClasspathScan());
+ try {
+ this.pluginSystemTable = context //
+ .getPersistentStoreProvider() //
+ .getStore(PStoreConfig //
+ .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
+ .name(PSTORE_NAME) //
+ .build());
+ } catch (IOException | RuntimeException e) {
+ logger.error("Failure while loading storage plugin registry.", e);
+ throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
+ }
+
+ ephemeralPlugins = CacheBuilder.newBuilder()
+ .expireAfterAccess(24, TimeUnit.HOURS)
+ .maximumSize(250)
+ .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() {
+ @Override
+ public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) {
+ closePlugin(notification.getValue());
+ }
+ })
+ .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
+ @Override
+ public StoragePlugin load(StoragePluginConfig config) throws Exception {
+ return create(null, config);
+ }
+ });
+ }
+
+ public PStore<StoragePluginConfig> getStore() {
+ return pluginSystemTable;
+ }
+
+ public void init() throws DrillbitStartupException {
+ availablePlugins = findAvailablePlugins(classpathScan);
+
+ // create registered plugins defined in "storage-plugins.json"
+ this.plugins.putAll(createPlugins());
+ }
+
+ private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+ try {
+ /*
+ * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into
+ * the system table.
+ */
+ if (!pluginSystemTable.iterator().hasNext()) {
+ // bootstrap load the config since no plugins are stored.
+ logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
+ Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+ if (urls != null && !urls.isEmpty()) {
+ logger.info("Loading the storage plugin configs from URLs {}.", urls);
+ Map<String, URL> pluginURLMap = Maps.newHashMap();
+ for (URL url : urls) {
+ String pluginsData = Resources.toString(url, Charsets.UTF_8);
+ StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+ for (Map.Entry<String, StoragePluginConfig> config : plugins) {
+ if (!pluginSystemTable.putIfAbsent(config.getKey(), config.getValue())) {
+ logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
+ config.getKey(), pluginURLMap.get(config.getKey()), url);
+ continue;
+ }
+ pluginURLMap.put(config.getKey(), url);
+ }
+ }
+ } else {
+ throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
+ }
+ }
+
+ Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+ for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
+ String name = entry.getKey();
+ StoragePluginConfig config = entry.getValue();
+ if (config.isEnabled()) {
+ try {
+ StoragePlugin plugin = create(name, config);
+ activePlugins.put(name, plugin);
+ } catch (ExecutionSetupException e) {
+ logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
+ config.setEnabled(false);
+ pluginSystemTable.put(name, config);
+ }
+ }
+ }
+
+ activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
+ INFORMATION_SCHEMA_PLUGIN));
+ activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
+
+ return activePlugins;
+ } catch (IOException e) {
+ logger.error("Failure setting up storage plugins. Drillbit exiting.", e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void addPlugin(String name, StoragePlugin plugin) {
+ plugins.put(name, plugin);
+ }
+
+ public void deletePlugin(String name) {
+ StoragePlugin plugin = plugins.remove(name);
+ closePlugin(plugin);
+ pluginSystemTable.delete(name);
+ }
+
+ private void closePlugin(StoragePlugin plugin) {
+ if (plugin == null) {
+ return;
+ }
+
+ try {
+ plugin.close();
+ } catch (Exception e) {
+ logger.warn("Exception while shutting down storage plugin.");
+ }
+ }
+
+ public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
+ throws ExecutionSetupException {
+ for (;;) {
+ final StoragePlugin oldPlugin = plugins.get(name);
+ final StoragePlugin newPlugin = create(name, config);
+ boolean done = false;
+ try {
+ if (oldPlugin != null) {
+ if (config.isEnabled()) {
+ done = plugins.replace(name, oldPlugin, newPlugin);
+ } else {
+ done = plugins.remove(name, oldPlugin);
+ }
+ if (done) {
+ closePlugin(oldPlugin);
+ }
+ } else if (config.isEnabled()) {
+ done = (null == plugins.putIfAbsent(name, newPlugin));
+ } else {
+ done = true;
+ }
+ } finally {
+ if (!done) {
+ closePlugin(newPlugin);
+ }
+ }
+
+ if (done) {
+ if (persist) {
+ pluginSystemTable.put(name, config);
+ }
+
+ return newPlugin;
+ }
+ }
+ }
+
+ public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
+ StoragePlugin plugin = plugins.get(name);
+ if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
+ return plugin;
+ }
+
+ // since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
+ StoragePluginConfig config = this.pluginSystemTable.get(name);
+ if (config == null) {
+ if (plugin != null) {
+ plugins.remove(name);
+ }
+ return null;
+ } else {
+ if (plugin == null
+ || !plugin.getConfig().equals(config)
+ || plugin.getConfig().isEnabled() != config.isEnabled()) {
+ plugin = createOrUpdate(name, config, false);
+ }
+ return plugin;
+ }
+ }
+
+
+ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+ if (config instanceof NamedStoragePluginConfig) {
+ return getPlugin(((NamedStoragePluginConfig) config).name);
+ } else {
+ // try to lookup plugin by configuration
+ StoragePlugin plugin = plugins.get(config);
+ if (plugin != null) {
+ return plugin;
+ }
+
+ // no named plugin matches the desired configuration, let's create an
+ // ephemeral storage plugin (or get one from the cache)
+ try {
+ return ephemeralPlugins.get(config);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ExecutionSetupException) {
+ throw (ExecutionSetupException) cause;
+ } else {
+ // this shouldn't happen. here for completeness.
+ throw new ExecutionSetupException("Failure while trying to create ephemeral plugin.", cause);
+ }
+ }
+ }
+ }
+
+ public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
+ throws ExecutionSetupException {
+ StoragePlugin p = getPlugin(storageConfig);
+ if (!(p instanceof FileSystemPlugin)) {
+ throw new ExecutionSetupException(
+ String.format("You tried to request a format plugin for a storage plugin that wasn't of type "
+ + "FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
+ }
+ FileSystemPlugin storage = (FileSystemPlugin) p;
+ return storage.getFormatPlugin(formatConfig);
+ }
+
+ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+ StoragePlugin plugin = null;
+ Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+ if (c == null) {
+ throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+ pluginConfig));
+ }
+ try {
+ plugin = c.newInstance(pluginConfig, context, name);
+ plugin.start();
+ return plugin;
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | IOException e) {
+ Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+ if (t instanceof ExecutionSetupException) {
+ throw ((ExecutionSetupException) t);
+ }
+ throw new ExecutionSetupException(String.format(
+ "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+ }
+ }
+
+ @Override
+ public Iterator<Entry<String, StoragePlugin>> iterator() {
+ return plugins.iterator();
+ }
+
+ /**
+ * Return StoragePlugin rule sets.
+ *
+ * @param optimizerRulesContext
+ * @return Array of logical and physical rule sets.
+ */
+ public RuleSet[] getStoragePluginRuleSet(OptimizerRulesContext optimizerRulesContext) {
+ // query registered engines for optimizer rules and build the storage plugin RuleSet
+ Builder<RelOptRule> logicalRulesBuilder = ImmutableSet.builder();
+ Builder<RelOptRule> physicalRulesBuilder = ImmutableSet.builder();
+ for (StoragePlugin plugin : this.plugins.plugins()) {
+ Set<? extends RelOptRule> rules = plugin.getLogicalOptimizerRules(optimizerRulesContext);
+ if (rules != null && rules.size() > 0) {
+ logicalRulesBuilder.addAll(rules);
+ }
+ rules = plugin.getPhysicalOptimizerRules(optimizerRulesContext);
+ if (rules != null && rules.size() > 0) {
+ physicalRulesBuilder.addAll(rules);
+ }
+ }
+
+ return new RuleSet[] {
+ DrillRuleSets.create(logicalRulesBuilder.build()),
+ DrillRuleSets.create(physicalRulesBuilder.build()) };
+ }
+
+ public SchemaFactory getSchemaFactory() {
+ return schemaFactory;
+ }
+
+ public class DrillSchemaFactory implements SchemaFactory {
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+
+ try {
+ Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
+ // iterate through the plugin instances in the persistence store adding
+ // any new ones and refreshing those whose configuration has changed
+ for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
+ if (config.getValue().isEnabled()) {
+ getPlugin(config.getKey());
+ currentPluginNames.remove(config.getKey());
+ }
+ }
+ // remove those which are no longer in the registry
+ for (String pluginName : currentPluginNames) {
+ if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
+ continue;
+ }
+ plugins.remove(pluginName);
+ }
+
+ // finally register schemas with the refreshed plugins
+ for (StoragePlugin plugin : plugins.plugins()) {
+ plugin.registerSchemas(schemaConfig, parent);
+ }
+ } catch (ExecutionSetupException e) {
+ throw new DrillRuntimeException("Failure while updating storage plugins", e);
+ }
+
+ // Add second level schema as top level schema with name qualified with parent schema name
+ // Ex: "dfs" schema has "default" and "tmp" as sub schemas. Add following extra schemas "dfs.default" and
+ // "dfs.tmp" under root schema.
+ //
+ // Before change, schema tree looks like below:
+ // "root"
+ // -- "dfs"
+ // -- "default"
+ // -- "tmp"
+ // -- "hive"
+ // -- "default"
+ // -- "hivedb1"
+ //
+ // After the change, the schema tree looks like below:
+ // "root"
+ // -- "dfs"
+ // -- "default"
+ // -- "tmp"
+ // -- "dfs.default"
+ // -- "dfs.tmp"
+ // -- "hive"
+ // -- "default"
+ // -- "hivedb1"
+ // -- "hive.default"
+ // -- "hive.hivedb1"
+ List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
+ for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
+ SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
+ for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
+ secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
+ }
+ }
+
+ for (SchemaPlus schema : secondLevelSchemas) {
+ AbstractSchema drillSchema;
+ try {
+ drillSchema = schema.unwrap(AbstractSchema.class);
+ } catch (ClassCastException e) {
+ throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
+ }
+ SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
+ parent.add(wrapper.getName(), wrapper);
+ }
+
+ logger.debug("Took {} ms to register schemas.", watch.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ ephemeralPlugins.invalidateAll();
+ plugins.close();
+ }
+
+ /**
+ * Get a list of all available storage plugin class constructors.
+ * @param classpathScan
+ * A classpath scan to use.
+ * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
+ Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+ final Collection<Class<? extends StoragePlugin>> pluginClasses =
+ classpathScan.getImplementations(StoragePlugin.class);
+ final String lineBrokenList =
+ pluginClasses.size() == 0
+ ? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
+ logger.debug("Found {} storage plugin configuration classes: {}.",
+ pluginClasses.size(), lineBrokenList);
+ for (Class<? extends StoragePlugin> plugin : pluginClasses) {
+ int i = 0;
+ for (Constructor<?> c : plugin.getConstructors()) {
+ Class<?>[] params = c.getParameterTypes();
+ if (params.length != 3
+ || params[1] != DrillbitContext.class
+ || !StoragePluginConfig.class.isAssignableFrom(params[0])
+ || params[2] != String.class) {
+ logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
+ + "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+ continue;
+ }
+ availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+ i++;
+ }
+ if (i == 0) {
+ logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
+ + "of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+ }
+ }
+ return availablePlugins;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6b5d9fe..6901a5f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -75,6 +75,7 @@ drill.exec: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
},
storage: {
+ registry: "org.apache.drill.exec.store.StoragePluginRegistryImpl",
file: {
text: {
buffer.size: 262144,
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 79245b2..24c8c63 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.apache.drill.exec.testing.ExecutionControls;
import org.junit.Rule;
@@ -105,7 +106,7 @@ public class PlanningBase extends ExecTest{
}
};
- final StoragePluginRegistry registry = new StoragePluginRegistry(dbContext);
+ final StoragePluginRegistry registry = new StoragePluginRegistryImpl(dbContext);
registry.init();
final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
final DrillOperatorTable table = new DrillOperatorTable(functionRegistry);
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 322e54a..288e78d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.test.DrillTest;
@@ -182,7 +183,7 @@ public class TestAllocators extends DrillTest {
bit.run();
final DrillbitContext bitContext = bit.getContext();
FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
- StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
+ StoragePluginRegistry storageRegistry = new StoragePluginRegistryImpl(bitContext);
// Create a few Fragment Contexts
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 7207bf2..ef102b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -54,6 +54,7 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
@@ -314,7 +315,7 @@ public class TestOptiqPlans extends ExecTest {
}
};
- final StoragePluginRegistry reg = new StoragePluginRegistry(bitContext);
+ final StoragePluginRegistry reg = new StoragePluginRegistryImpl(bitContext);
final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(config, reg);
final PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
http://git-wip-us.apache.org/repos/asf/drill/blob/f964908a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 3b363bd..a533620 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.util.FileUtils;
@@ -46,7 +49,7 @@ import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Ignore;
import org.junit.Test;
@@ -56,9 +59,6 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
-import mockit.Injectable;
-import mockit.NonStrictExpectations;
-
public class TestMergeJoin extends PopUnitTestBase {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
@@ -130,7 +130,8 @@ public class TestMergeJoin extends PopUnitTestBase {
bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
}};
- final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+ final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+ new StoragePluginRegistryImpl(bitContext));
final PhysicalPlan plan = reader.readPhysicalPlan(
Files.toString(
FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)
@@ -186,7 +187,8 @@ public class TestMergeJoin extends PopUnitTestBase {
bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
}};
- final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+ final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+ new StoragePluginRegistryImpl(bitContext));
final PhysicalPlan plan = reader.readPhysicalPlan(
Files.toString(
FileUtils.getResourceAsFile("/join/merge_inner_single_batch.json"), Charsets.UTF_8)
@@ -242,7 +244,8 @@ public class TestMergeJoin extends PopUnitTestBase {
bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
}};
- final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c, new StoragePluginRegistry(bitContext));
+ final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c,
+ new StoragePluginRegistryImpl(bitContext));
final PhysicalPlan plan = reader.readPhysicalPlan(
Files.toString(
FileUtils.getResourceAsFile("/join/merge_multi_batch.json"), Charsets.UTF_8)