You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/03 17:23:06 UTC

[GitHub] vdiravka closed pull request #1345: DRILL-6494: Drill Plugins Handler

vdiravka closed pull request #1345: DRILL-6494: Drill Plugins Handler
URL: https://github.com/apache/drill/pull/1345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/pom.xml b/common/pom.xml
index a7fba2bf16..a8cab1075d 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -53,13 +53,11 @@
     <dependency>
       <groupId>com.typesafe</groupId>
       <artifactId>config</artifactId>
-      <version>1.0.0</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <version>3.1</version>
     </dependency>
 
     <dependency>
diff --git a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java b/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
index 1b5fb29c01..e203972b8e 100644
--- a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
+++ b/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
@@ -31,4 +31,7 @@
   /** Override configuration file name.  (Classpath resource pathname.) */
   String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf";
 
+  /** Override plugins configs file name.  (Classpath resource pathname.) */
+  String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf";
+
 }
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 66058643da..7211f19363 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -261,7 +261,7 @@ private static DrillConfig create(String overrideFileResourcePathname,
     final String className = getString(location);
     if (className == null) {
       throw new DrillConfigurationException(String.format(
-          "No class defined at location '%s'. Expected a definition of the class []",
+          "No class defined at location '%s'. Expected a definition of the class [%s]",
           location, clazz.getCanonicalName()));
     }
 
diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
index 13a5eade9a..909e8110df 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
@@ -51,7 +51,6 @@
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 
 import javassist.bytecode.AccessFlag;
 import javassist.bytecode.AnnotationsAttribute;
@@ -320,15 +319,12 @@ public void scan(final Object cls) {
    *           to scan for (relative to specified class loaders' classpath roots)
    * @param  returnRootPathname  whether to collect classpath root portion of
    *           URL for each resource instead of full URL of each resource
-   * @param  classLoaders  set of class loaders in which to look up resource;
-   *           none (empty array) to specify to use current thread's context
-   *           class loader and {@link Reflections}'s class loader
    * @returns  ...; empty set if none
    */
   public static Set<URL> forResource(final String resourcePathname, final boolean returnRootPathname) {
     logger.debug("Scanning classpath for resources with pathname \"{}\".",
                  resourcePathname);
-    final Set<URL> resultUrlSet = Sets.newHashSet();
+    final Set<URL> resultUrlSet = new HashSet<>();
     final ClassLoader classLoader = ClassPathScanner.class.getClassLoader();
     try {
       final Enumeration<URL> resourceUrls = classLoader.getResources(resourcePathname);
diff --git a/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java b/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java
new file mode 100644
index 0000000000..cca1e771d7
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * It defines possible actions on the file and performs the necessary action
+ */
+public enum ActionOnFile {
+
+  /**
+   * No action will be performed
+   */
+  NONE {
+    @Override
+    public void action(URL url) { }
+  },
+
+  /**
+   * Rename the file by adding current timestamp value with "yyyyMMdd_HHmmss" format before last dot of original file name<p>
+   * Example:<br>
+   * Original file name: "storage-plugins-override.conf"<br>
+   * New file name: "storage-plugins-override-20180703_033354.conf"
+   */
+  RENAME {
+    @Override
+    public void action(URL url) {
+      String fileName = url.getFile();
+      File file = new File(url.getPath());
+      String currentDateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
+      String newFileName = new StringBuilder(fileName)
+          .insert(fileName.lastIndexOf("."), "-" + currentDateTime)
+          .toString();
+      Path filePath = file.toPath();
+      try {
+        Files.move(filePath, filePath.resolveSibling(newFileName));
+      } catch (IOException e) {
+        logger.error("There was an error during file {} rename.", fileName, e);
+      }
+    }
+  },
+
+  /**
+   * It removes the file
+   */
+  REMOVE {
+    @Override
+    public void action(URL url) {
+      File file = new File(url.getPath());
+      try {
+        Files.delete(file.toPath());
+      } catch (IOException e) {
+        logger.error("There was an error during file {} removing.", url.getFile(), e);
+      }
+    }
+  };
+
+  private static final org.slf4j.Logger logger =  org.slf4j.LoggerFactory.getLogger(ActionOnFile.class);
+
+  /**
+   * This is an action which should be performed on the file
+   * @param url the file URL
+   */
+  public abstract void action(URL url);
+}
diff --git a/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json
index 3e0e8c0417..530a407c84 100644
--- a/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json
@@ -2,11 +2,11 @@
   "storage":{
     hbase : {
       type:"hbase",
-      enabled: false,
       config : {
         "hbase.zookeeper.quorum" : "localhost",
         "hbase.zookeeper.property.clientPort" : 2181
-      }
+      },
+      enabled: false
     }
   }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index d3115b8a6e..e3cb3a2dd7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -27,7 +27,6 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -72,6 +71,7 @@ public HiveSchemaFactory(final HiveStoragePlugin plugin, final String name, fina
     isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
 
     try {
+      // TODO: DRILL-6412. Clients for plugin should be instantiated only for the case, when plugin is enabled
       processUserMetastoreClient =
           DrillHiveMetaStoreClient.createCloseableClientWithCaching(hiveConf);
     } catch (MetaException e) {
@@ -82,12 +82,9 @@ public HiveSchemaFactory(final HiveStoragePlugin plugin, final String name, fina
         .newBuilder()
         .expireAfterAccess(10, TimeUnit.MINUTES)
         .maximumSize(5) // Up to 5 clients for impersonation-enabled.
-        .removalListener(new RemovalListener<String, DrillHiveMetaStoreClient>() {
-          @Override
-          public void onRemoval(RemovalNotification<String, DrillHiveMetaStoreClient> notification) {
-            DrillHiveMetaStoreClient client = notification.getValue();
-            client.close();
-          }
+        .removalListener((RemovalListener<String, DrillHiveMetaStoreClient>) notification -> {
+          DrillHiveMetaStoreClient client = notification.getValue();
+          client.close();
         })
         .build(new CacheLoader<String, DrillHiveMetaStoreClient>() {
           @Override
diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
index 5c7174e253..d06220fe7c 100644
--- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
@@ -2,14 +2,16 @@
   "storage":{
     hive : {
       type:"hive",
-      enabled: false,
       config : {
         "hive.metastore.uris" : "",
         "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true",
         "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
         "fs.default.name" : "file:///",
-        "hive.metastore.sasl.enabled" : "false"
-      }
+        "hive.metastore.sasl.enabled" : "false",
+        "hive.metastore.schema.verification": "false",
+        "datanucleus.schema.autoCreateAll": "true"
+      },
+      enabled: false
     }
   }
 }
diff --git a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
index add9808474..4018d92478 100755
--- a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
@@ -2,15 +2,16 @@
     "storage" : {
         derby : {
           type    : "jdbc",
-          enabled : true,
           driver  : "org.apache.derby.jdbc.ClientDriver",
-          url     : "jdbc:derby://localhost:${derby.reserved.port}/memory:${derby.database.name};user=root;password=root"
+          url     : "jdbc:derby://localhost:${derby.reserved.port}/memory:${derby.database.name};user=root;password=root",
+          enabled : true
         },
         mysql : {
           type    : "jdbc",
           enabled : true,
           driver  : "com.mysql.jdbc.Driver",
-          url     : "jdbc:mysql://localhost:${mysql.reserved.port}/${mysql.database.name}?user=root&password=root&useJDBCCompliantTimezoneShift=true"
+          url     : "jdbc:mysql://localhost:${mysql.reserved.port}/${mysql.database.name}?user=root&password=root&useJDBCCompliantTimezoneShift=true",
+          enabled : true
       }
     }
 }
diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
index 406c030608..18a1df5641 100644
--- a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
@@ -2,8 +2,8 @@
   "storage":{
     kafka : {
       type:"kafka",
-      enabled: false,
-      kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"}
+      kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"},
+      enabled: false
     }
   }
 }
diff --git a/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json
index b7d34f2fb3..9983596d35 100644
--- a/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json
@@ -2,8 +2,8 @@
   "storage":{
     mongo : {
       type:"mongo",
-      enabled: false,
-      connection:"mongodb://localhost:27017/"
+      connection:"mongodb://localhost:27017/",
+      enabled: false
     }
   }
 }
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index e044801f9b..712f3ec596 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -420,6 +420,11 @@
       <source>src/resources/drill-on-yarn-example.conf</source>
       <outputDirectory>conf</outputDirectory>
       <fileMode>0640</fileMode>
-   </file>
+    </file>
+    <file>
+      <source>src/resources/storage-plugins-override-example.conf</source>
+      <outputDirectory>conf</outputDirectory>
+      <fileMode>0640</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index fa2e395559..296cd8b60e 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -58,17 +58,10 @@ drill.exec: {
         batch.size: 4000
       },
       partition.column.label: "dir"
-    }
-  },
-  metrics : {
-    context: "drillbit",
-    jmx: {
-      enabled : true
     },
-    log: {
-      enabled : false,
-      interval : 60
-    }
+    # The action on the storage-plugins-override.conf after it's use.
+    # Possible values are "none" (default), "rename", "remove"
+    action_on_plugins_override_file: "none"
   },
   zk: {
 	connect: "localhost:2181",
@@ -252,6 +245,15 @@ drill.exec: {
     #ssl provider. May be "JDK" or "OPENSSL". Default is "JDK"
     provider: "JDK"
   }
-}
-
+},
 
+drill.metrics : {
+  context: "drillbit",
+  jmx: {
+    enabled : true
+  },
+  log: {
+    enabled : false,
+    interval : 60
+  }
+}
diff --git a/distribution/src/resources/storage-plugins-override-example.conf b/distribution/src/resources/storage-plugins-override-example.conf
new file mode 100644
index 0000000000..360ba2c0a4
--- /dev/null
+++ b/distribution/src/resources/storage-plugins-override-example.conf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file involves storage plugins configs, which can be updated on the Drill start-up.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+  "storage":{
+    cp: {
+      type: "file",
+      connection: "classpath:///",
+      formats: {
+        "csv" : {
+          type: "text",
+          extensions: [ "csv" ],
+          delimiter: ","
+        }
+      }
+    }
+  }
+  "storage":{
+    dfs: {
+      type: "file",
+      connection: "hdfs:///",
+      workspaces: {
+        "root": {
+          "location": "/",
+          "writable": false,
+          "defaultInputFormat": null,
+          "allowAccessOutsideWorkspace": false
+        }
+      },
+      formats: {
+        "parquet": {
+          "type": "parquet"
+        }
+      },
+      enabled: false
+    }
+  }
+  "storage":{
+    mongo : {
+      type:"mongo",
+      connection:"mongodb://test_host:27017/",
+      enabled: true
+    }
+  }
+  "storage": {
+    openTSDB: {
+      type: "openTSDB",
+      connection: "http://localhost:8888",
+      enabled: true
+    }
+  }
diff --git a/drill-yarn/pom.xml b/drill-yarn/pom.xml
index 6bad97a3d3..b6ada46b2b 100644
--- a/drill-yarn/pom.xml
+++ b/drill-yarn/pom.xml
@@ -106,7 +106,6 @@
     <dependency>
       <groupId>com.typesafe</groupId>
       <artifactId>config</artifactId>
-      <version>1.0.0</version>
     </dependency>
 
     <!-- Logging -->
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 7701e76165..94c345953d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -205,6 +205,10 @@
       <artifactId>jackson-module-afterburner</artifactId>
       <version>${jackson.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.honton.chas.hocon</groupId>
+      <artifactId>jackson-dataformat-hocon</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.glassfish.jersey.ext</groupId>
       <artifactId>jersey-mvc-freemarker</artifactId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
index d234854221..1493a923c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -35,7 +36,7 @@
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 
-public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{
+public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>> {
 
   private Map<String, StoragePluginConfig> storage;
 
@@ -95,4 +96,45 @@ public boolean equals(Object obj) {
     return storage.equals(((StoragePlugins) obj).getStorage());
   }
 
+  /**
+   * Put one plugin into current storage plugins map
+   *
+   * @param name storage plugin name
+   * @param config storage plugin config
+   */
+  public void put(String name, StoragePluginConfig config) {
+    storage.put(name, config);
+  }
+
+  /**
+   * Put other storage plugins into current storage plugins map
+   *
+   * @param plugins storage plugins
+   */
+  public void putAll(StoragePlugins plugins) {
+    Optional.ofNullable(plugins)
+        .ifPresent(p -> storage.putAll(p.getStorage()));
+  }
+
+  /**
+   * Put one plugin into current storage plugins map, if it was absent
+   *
+   * @param name storage plugin name
+   * @param config storage plugin config
+   * @return the previous storage plugin config, null if it was absent or it had null value
+   */
+  public StoragePluginConfig putIfAbsent(String name,  StoragePluginConfig config) {
+    return storage.putIfAbsent(name, config);
+  }
+
+  /**
+   * Return storage plugin config for certain plugin name
+   *
+   * @param pluginName storage plugin name
+   * @return storage plugin config
+   */
+  public StoragePluginConfig getConfig(String pluginName) {
+    return storage.get(pluginName);
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index ca108606ca..9f08957d40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -99,6 +99,7 @@ public Viewable getStoragePlugins() {
   @Produces(MediaType.APPLICATION_JSON)
   public PluginConfigWrapper getStoragePluginJSON(@PathParam("name") String name) {
     try {
+      // TODO: DRILL-6412: No need to get StoragePlugin. It is enough to have plugin name and config here
       StoragePlugin plugin = storage.getPlugin(name);
       if (plugin != null) {
         return new PluginConfigWrapper(name, plugin.getConfig());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
index 9b2c928717..41db97b3e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
@@ -17,22 +17,53 @@
  */
 package org.apache.drill.exec.store;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName("named")
+@JsonTypeName(NamedStoragePluginConfig.NAME)
 public class NamedStoragePluginConfig extends StoragePluginConfig {
-  public String name;
+
+  public static final String NAME = "named";
+
+  private final String name;
+
+  @JsonCreator
+  public NamedStoragePluginConfig(@JsonProperty("name") String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
 
   @Override
-  public boolean equals(Object o) {
-    return this == o;
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    NamedStoragePluginConfig other = (NamedStoragePluginConfig) obj;
+    if (name == null) {
+      return other.name == null;
+    } else {
+      return name.equals(other.name);
+    }
   }
 
   @Override
   public int hashCode() {
-    return name.hashCode();
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (name == null ? 0 : name.hashCode());
+    return result;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index 28775c85cf..582791e739 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -21,13 +21,14 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 
@@ -40,7 +41,7 @@
 class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class);
 
-  private final ConcurrentMap<String, StoragePlugin> nameMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, StoragePlugin> nameMap = new ConcurrentHashMap<>();
 
   @SuppressWarnings("unchecked")
   private final Multimap<StoragePluginConfig, StoragePlugin> configMap =
@@ -111,7 +112,12 @@ public StoragePlugin get(String name) {
     return nameMap.entrySet().iterator();
   }
 
-  public Iterable<String> names() {
+  /**
+   * Returns set of plugin names of this {@link StoragePluginMap}
+   *
+   * @return plugin names
+   */
+  public Set<String> getNames() {
     return nameMap.keySet();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 82f18f8d50..313d3b985e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -27,85 +27,84 @@
 import org.apache.drill.exec.store.sys.PersistentStore;
 
 public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
-  final String SYS_PLUGIN = "sys";
-  final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
-  final String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
-  final String PSTORE_NAME = "sys.storage_plugins";
+  String SYS_PLUGIN = "sys";
+  String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA";
+  String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry";
+  String ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE = "drill.exec.storage.action_on_plugins_override_file";
+  String PSTORE_NAME = "sys.storage_plugins";
 
   /**
    * Initialize the storage plugin registry. Must be called before the registry is used.
    *
-   * @throws DrillbitStartupException
+   * @throws DrillbitStartupException if drillbit startup fails
    */
   void init() throws DrillbitStartupException;
 
   /**
    * Delete a plugin by name
-   * @param name
-   *          The name of the storage plugin to delete.
+   *
+   * @param name The name of the storage plugin to delete.
    */
   void deletePlugin(String name);
 
   /**
    * Create a plugin by name and configuration. If the plugin already exists, update the plugin
-   * @param name
-   *          The name of the plugin
-   * @param config
-   *          The plugin configuration
-   * @param persist
-   *          Whether to persist the plugin for later use or treat it as ephemeral.
+   *
+   * @param name The name of the plugin
+   * @param config The plugin configuration
+   * @param persist Whether to persist the plugin for later use or treat it as ephemeral.
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException
+   * @throws ExecutionSetupException if plugin cannot be created
    */
   StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException;
 
   /**
    * Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist.
-   * @param name
-   *          The name of the plugin
+   *
+   * @param name The name of the plugin
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException
+   * @throws ExecutionSetupException if plugin cannot be obtained
    */
   StoragePlugin getPlugin(String name) throws ExecutionSetupException;
 
   /**
    * Get a plugin by configuration. If it doesn't exist, create it.
-   * @param config
-   *          The configuration for the plugin.
+   *
+   * @param config The configuration for the plugin.
    * @return The StoragePlugin instance.
-   * @throws ExecutionSetupException
+   * @throws ExecutionSetupException if plugin cannot be obtained
    */
   StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException;
 
   /**
    * Add a plugin to the registry using the provided name.
    *
-   * @param name
-   * @param plugin
+   * @param name The name of the plugin
+   * @param plugin The StoragePlugin instance
    */
-  void addPlugin(String name, StoragePlugin plugin);
+  void addEnabledPlugin(String name, StoragePlugin plugin);
 
   /**
    * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config.
    *
-   * @param storageConfig
-   *          The storage config for the associated FileSystemPlugin
-   * @param formatConfig
-   *          The format config for the associated FormatPlugin
-   * @return A FormatPlugin
-   * @throws ExecutionSetupException
+   * @param storageConfig The storage config for the associated FileSystemPlugin
+   * @param formatConfig The format config for the associated FormatPlugin
+   * @return A FormatPlugin instance
+   * @throws ExecutionSetupException if plugin cannot be obtained
    */
   FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
       throws ExecutionSetupException;
 
   /**
    * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
+   *
    * @return PStore for StoragePlugin configuration objects.
    */
   PersistentStore<StoragePluginConfig> getStore();
 
   /**
    * Get the Schema factory associated with this storage plugin registry.
+   *
    * @return A SchemaFactory that can register the schemas associated with this plugin registry.
    */
   SchemaFactory getSchemaFactory();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index f2edf5e4b9..8e5fba449c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -23,9 +23,11 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +36,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -43,7 +46,6 @@
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -63,17 +65,13 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.io.Resources;
 
 public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
 
   private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
-  private final StoragePluginMap plugins = new StoragePluginMap();
+  private final StoragePluginMap enabledPlugins = new StoragePluginMap();
 
   private DrillbitContext context;
   private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
@@ -87,11 +85,11 @@ public StoragePluginRegistryImpl(DrillbitContext context) {
     this.lpPersistence = checkNotNull(context.getLpPersistence());
     this.classpathScan = checkNotNull(context.getClasspathScan());
     try {
-      this.pluginSystemTable = context //
-          .getStoreProvider() //
-          .getOrCreateStore(PersistentStoreConfig //
-              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
-              .name(PSTORE_NAME) //
+      this.pluginSystemTable = context
+          .getStoreProvider()
+          .getOrCreateStore(PersistentStoreConfig
+              .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
+              .name(PSTORE_NAME)
               .build());
     } catch (StoreException | RuntimeException e) {
       logger.error("Failure while loading storage plugin registry.", e);
@@ -101,12 +99,8 @@ public StoragePluginRegistryImpl(DrillbitContext context) {
     ephemeralPlugins = CacheBuilder.newBuilder()
         .expireAfterAccess(24, TimeUnit.HOURS)
         .maximumSize(250)
-        .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() {
-          @Override
-          public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) {
-            closePlugin(notification.getValue());
-          }
-        })
+        .removalListener(
+            (RemovalListener<StoragePluginConfig, StoragePlugin>) notification -> closePlugin(notification.getValue()))
         .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
           @Override
           public StoragePlugin load(StoragePluginConfig config) throws Exception {
@@ -121,69 +115,82 @@ public StoragePlugin load(StoragePluginConfig config) throws Exception {
   }
 
   @Override
-  public void init() throws DrillbitStartupException {
+  public void init() {
     availablePlugins = findAvailablePlugins(classpathScan);
+    try {
+      StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins();
+
+      StoragePluginsHandler storagePluginsHandler = new StoragePluginsHandlerService(context);
+      storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins);
 
-    // create registered plugins defined in "storage-plugins.json"
-    plugins.putAll(createPlugins());
+      defineEnabledPlugins();
+    } catch (IOException e) {
+      logger.error("Failure setting up storage enabledPlugins.  Drillbit exiting.", e);
+      throw new IllegalStateException(e);
+    }
   }
 
-  @SuppressWarnings("resource")
-  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
-    try {
-      /*
-       * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into
-       * the system table.
-       */
-      if (!pluginSystemTable.getAll().hasNext()) {
-        // bootstrap load the config since no plugins are stored.
-        logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
-        Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
-        if (urls != null && !urls.isEmpty()) {
-          logger.info("Loading the storage plugin configs from URLs {}.", urls);
-          Map<String, URL> pluginURLMap = Maps.newHashMap();
-          for (URL url : urls) {
-            String pluginsData = Resources.toString(url, Charsets.UTF_8);
-            StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
-            for (Map.Entry<String, StoragePluginConfig> config : plugins) {
-              if (!definePluginConfig(config.getKey(), config.getValue())) {
-                logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
-                    config.getKey(), pluginURLMap.get(config.getKey()), url);
-                continue;
-              }
-              pluginURLMap.put(config.getKey(), url);
-            }
+  /**
+   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
+   * instantiating of Drill
+   *
+   * @return bootstrap storage plugins
+   * @throws IOException if a read error occurs
+   */
+  private StoragePlugins loadBootstrapPlugins() throws IOException {
+    // bootstrap load the config since no plugins are stored.
+    logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
+    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+    if (urls != null && !urls.isEmpty()) {
+      logger.info("Loading the storage plugin configs from URLs {}.", urls);
+      StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
+      Map<String, URL> pluginURLMap = new HashMap<>();
+      for (URL url : urls) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+        for (Entry<String, StoragePluginConfig> plugin : plugins) {
+          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
+          if (oldPluginConfig != null) {
+            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
+                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
+          } else {
+            pluginURLMap.put(plugin.getKey(), url);
           }
-        } else {
-          throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
         }
       }
+      return bootstrapPlugins;
+    } else {
+      throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
+    }
+  }
 
-      Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
-      for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(pluginSystemTable.getAll())) {
-        String name = entry.getKey();
-        StoragePluginConfig config = entry.getValue();
-        if (config.isEnabled()) {
-          try {
-            StoragePlugin plugin = create(name, config);
-            activePlugins.put(name, plugin);
-          } catch (ExecutionSetupException e) {
-            logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
-            config.setEnabled(false);
-            pluginSystemTable.put(name, config);
-          }
+  /**
+   * It initializes {@link #enabledPlugins} with currently enabled plugins
+   */
+  private void defineEnabledPlugins() {
+    Map<String, StoragePlugin> activePlugins = new HashMap<>();
+    Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
+    while (allPlugins.hasNext()) {
+      Entry<String, StoragePluginConfig> plugin = allPlugins.next();
+      String name = plugin.getKey();
+      StoragePluginConfig config = plugin.getValue();
+      if (config.isEnabled()) {
+        try {
+          StoragePlugin storagePlugin = create(name, config);
+          activePlugins.put(name, storagePlugin);
+        } catch (ExecutionSetupException e) {
+          logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
+          config.setEnabled(false);
+          pluginSystemTable.put(name, config);
         }
       }
+    }
 
-      activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
-          INFORMATION_SCHEMA_PLUGIN));
-      activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
+    activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context,
+        INFORMATION_SCHEMA_PLUGIN));
+    activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN));
 
-      return activePlugins;
-    } catch (IOException e) {
-      logger.error("Failure setting up storage plugins.  Drillbit exiting.", e);
-      throw new IllegalStateException(e);
-    }
+    enabledPlugins.putAll(activePlugins);
   }
 
   /**
@@ -194,25 +201,21 @@ public void init() throws DrillbitStartupException {
    * @param config plugin config
    * @param plugin plugin implementation
    */
-
-  public void definePlugin(String name, StoragePluginConfig config, StoragePlugin plugin) {
-    addPlugin(name, plugin);
-    definePluginConfig(name, config);
-  }
-
-  private boolean definePluginConfig(String name, StoragePluginConfig config) {
-    return pluginSystemTable.putIfAbsent(name, config);
+  @VisibleForTesting
+  public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
+    addEnabledPlugin(name, plugin);
+    pluginSystemTable.putIfAbsent(name, config);
   }
 
   @Override
-  public void addPlugin(String name, StoragePlugin plugin) {
-    plugins.put(name, plugin);
+  public void addEnabledPlugin(String name, StoragePlugin plugin) {
+    enabledPlugins.put(name, plugin);
   }
 
   @Override
   public void deletePlugin(String name) {
     @SuppressWarnings("resource")
-    StoragePlugin plugin = plugins.remove(name);
+    StoragePlugin plugin = enabledPlugins.remove(name);
     closePlugin(plugin);
     pluginSystemTable.delete(name);
   }
@@ -234,21 +237,21 @@ private void closePlugin(StoragePlugin plugin) {
   public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
       throws ExecutionSetupException {
     for (;;) {
-      final StoragePlugin oldPlugin = plugins.get(name);
+      final StoragePlugin oldPlugin = enabledPlugins.get(name);
       final StoragePlugin newPlugin = create(name, config);
       boolean done = false;
       try {
         if (oldPlugin != null) {
           if (config.isEnabled()) {
-            done = plugins.replace(name, oldPlugin, newPlugin);
+            done = enabledPlugins.replace(name, oldPlugin, newPlugin);
           } else {
-            done = plugins.remove(name, oldPlugin);
+            done = enabledPlugins.remove(name, oldPlugin);
           }
           if (done) {
             closePlugin(oldPlugin);
           }
         } else if (config.isEnabled()) {
-          done = (null == plugins.putIfAbsent(name, newPlugin));
+          done = (null == enabledPlugins.putIfAbsent(name, newPlugin));
         } else {
           done = true;
         }
@@ -270,7 +273,7 @@ public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boo
 
   @Override
   public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
-    StoragePlugin plugin = plugins.get(name);
+    StoragePlugin plugin = enabledPlugins.get(name);
     if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) {
       return plugin;
     }
@@ -279,7 +282,7 @@ public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
     StoragePluginConfig config = this.pluginSystemTable.get(name);
     if (config == null) {
       if (plugin != null) {
-        plugins.remove(name);
+        enabledPlugins.remove(name);
       }
       return null;
     } else {
@@ -296,10 +299,10 @@ public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
   @Override
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if (config instanceof NamedStoragePluginConfig) {
-      return getPlugin(((NamedStoragePluginConfig) config).name);
+      return getPlugin(((NamedStoragePluginConfig) config).getName());
     } else {
       // try to lookup plugin by configuration
-      StoragePlugin plugin = plugins.get(config);
+      StoragePlugin plugin = enabledPlugins.get(config);
       if (plugin != null) {
         return plugin;
       }
@@ -335,7 +338,9 @@ public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPlu
   }
 
   private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
-    StoragePlugin plugin = null;
+    // TODO: DRILL-6412: clients for storage plugins shouldn't be created, if storage plugin is disabled
+    // Creating of the StoragePlugin leads to instantiating storage clients
+    StoragePlugin plugin;
     Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
     if (c == null) {
       throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
@@ -358,7 +363,7 @@ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) thro
 
   @Override
   public Iterator<Entry<String, StoragePlugin>> iterator() {
-    return plugins.iterator();
+    return enabledPlugins.iterator();
   }
 
   @Override
@@ -374,13 +379,15 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
       Stopwatch watch = Stopwatch.createStarted();
 
       try {
-        Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
-        // iterate through the plugin instances in the persistence store adding
+        Set<String> currentPluginNames = new HashSet<>(enabledPlugins.getNames());
+        // iterate through the plugin instances in the persistent store adding
         // any new ones and refreshing those whose configuration has changed
-        for (Map.Entry<String, StoragePluginConfig> config : Lists.newArrayList(pluginSystemTable.getAll())) {
-          if (config.getValue().isEnabled()) {
-            getPlugin(config.getKey());
-            currentPluginNames.remove(config.getKey());
+        Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
+        while (allPlugins.hasNext()) {
+          Entry<String, StoragePluginConfig> plugin = allPlugins.next();
+          if (plugin.getValue().isEnabled()) {
+            getPlugin(plugin.getKey());
+            currentPluginNames.remove(plugin.getKey());
           }
         }
         // remove those which are no longer in the registry
@@ -388,11 +395,11 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
           if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) {
             continue;
           }
-          plugins.remove(pluginName);
+          enabledPlugins.remove(pluginName);
         }
 
         // finally register schemas with the refreshed plugins
-        for (StoragePlugin plugin : plugins.plugins()) {
+        for (StoragePlugin plugin : enabledPlugins.plugins()) {
           plugin.registerSchemas(schemaConfig, parent);
         }
       } catch (ExecutionSetupException e) {
@@ -424,7 +431,7 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
       // -- "hivedb1"
       // -- "hive.default"
       // -- "hive.hivedb1"
-      List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
+      List<SchemaPlus> secondLevelSchemas = new ArrayList<>();
       for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
         SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
         for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
@@ -451,19 +458,18 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
   @Override
   public synchronized void close() throws Exception {
     ephemeralPlugins.invalidateAll();
-    plugins.close();
+    enabledPlugins.close();
     pluginSystemTable.close();
   }
 
   /**
    * Get a list of all available storage plugin class constructors.
-   * @param classpathScan
-   *          A classpath scan to use.
+   * @param classpathScan A classpath scan to use.
    * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
    */
   @SuppressWarnings("unchecked")
   public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
-    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+    Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
     final Collection<Class<? extends StoragePlugin>> pluginClasses =
         classpathScan.getImplementations(StoragePlugin.class);
     final String lineBrokenList =
@@ -494,6 +500,4 @@ public synchronized void close() throws Exception {
     return availablePlugins;
   }
 
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java
new file mode 100644
index 0000000000..25b813c6b3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+
+/**
+ * Storage plugins handler is an additional service for updating storage plugins configs from the file
+ */
+public interface StoragePluginsHandler {
+
+  /**
+   * Update incoming storage plugins configs from persistence store if present, otherwise bootstrap plugins configs.
+   *
+   * @param persistentStore the last storage plugins configs from persistence store
+   * @param bootstrapPlugins bootstrap storage plugins, which are used in case of first Drill start up
+   * @return all storage plugins, which should be loaded into persistence store
+   */
+  void loadPlugins(PersistentStore<StoragePluginConfig> persistentStore, StoragePlugins bootstrapPlugins);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
new file mode 100644
index 0000000000..599f4a3458
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.jasonclawson.jackson.dataformat.hocon.HoconFactory;
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.util.ActionOnFile;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.StoragePluginRegistry.ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE;
+
+/**
+ * Drill plugins handler, which allows to update storage plugins configs from the
+ * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
+ *
+ * TODO: DRILL-6564: It can be improved with configs versioning and service of creating
+ * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
+ */
+public class StoragePluginsHandlerService implements StoragePluginsHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class);
+
+  private final LogicalPlanPersistence lpPersistence;
+  private final DrillbitContext context;
+  private URL pluginsOverrideFileUrl;
+
+  public StoragePluginsHandlerService(DrillbitContext context) {
+    this.context = context;
+    this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), context.getClasspathScan(),
+        new ObjectMapper(new HoconFactory()));
+  }
+
+  @Override
+  public void loadPlugins(@NotNull PersistentStore<StoragePluginConfig> persistentStore,
+                          @Nullable StoragePlugins bootstrapPlugins) {
+    // if bootstrapPlugins is not null -- fresh Drill set up
+    StoragePlugins pluginsForPersistentStore;
+
+    StoragePlugins newPlugins = getNewStoragePlugins();
+
+    if (newPlugins != null) {
+      pluginsForPersistentStore = new StoragePlugins(new HashMap<>());
+      Optional.ofNullable(bootstrapPlugins)
+          .ifPresent(pluginsForPersistentStore::putAll);
+
+      for (Map.Entry<String, StoragePluginConfig> newPlugin : newPlugins) {
+        String pluginName = newPlugin.getKey();
+        StoragePluginConfig oldPluginConfig = Optional.ofNullable(bootstrapPlugins)
+            .map(plugins -> plugins.getConfig(pluginName))
+            .orElse(persistentStore.get(pluginName));
+        StoragePluginConfig updatedStatusPluginConfig = updatePluginStatus(oldPluginConfig, newPlugin.getValue());
+        pluginsForPersistentStore.put(pluginName, updatedStatusPluginConfig);
+      }
+    } else {
+      pluginsForPersistentStore = bootstrapPlugins;
+    }
+
+    // load pluginsForPersistentStore to Persistent Store
+    Optional.ofNullable(pluginsForPersistentStore)
+        .ifPresent(plugins -> plugins.forEach(plugin -> persistentStore.put(plugin.getKey(), plugin.getValue())));
+
+    if (newPlugins != null) {
+      String fileAction = context.getConfig().getString(ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE);
+      Optional<ActionOnFile> actionOnFile = Arrays.stream(ActionOnFile.values())
+          .filter(action -> action.name().equalsIgnoreCase(fileAction))
+          .findFirst();
+      actionOnFile.ifPresent(action -> action.action(pluginsOverrideFileUrl));
+      // TODO: replace with ifPresentOrElse() once the project will be on Java9
+      if (!actionOnFile.isPresent()) {
+        logger.error("Unknown value {} for {} boot option. Nothing will be done with file.",
+            fileAction, ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE);
+      }
+    }
+  }
+
+  /**
+   * Helper method to identify the enabled status for new storage plugins config. If this status is absent in the updater
+   * file, the status is kept from the configs, which are going to be updated
+   *
+   * @param oldPluginConfig current storage plugin config from Persistent Store or bootstrap config file
+   * @param newPluginConfig new storage plugin config
+   * @return new storage plugin config with updated enabled status
+   */
+  private StoragePluginConfig updatePluginStatus(@Nullable StoragePluginConfig oldPluginConfig,
+                                                 @NotNull StoragePluginConfig newPluginConfig) {
+    if (!newPluginConfig.isEnabledStatusPresent()) {
+      boolean newStatus = oldPluginConfig != null && oldPluginConfig.isEnabled();
+      newPluginConfig.setEnabled(newStatus);
+    }
+    return newPluginConfig;
+  }
+
+  /**
+   * Get the new storage plugins from the {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists,
+   * null otherwise
+   *
+   * @return storage plugins
+   */
+  private StoragePlugins getNewStoragePlugins() {
+    Set<URL> urlSet = ClassPathScanner.forResource(CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false);
+    if (!urlSet.isEmpty()) {
+      if (urlSet.size() != 1) {
+        DrillRuntimeException.format("More than one %s file is placed in Drill's classpath: %s",
+            CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet);
+      }
+      pluginsOverrideFileUrl = urlSet.iterator().next();
+      try {
+        String newPluginsData = Resources.toString(pluginsOverrideFileUrl, Charsets.UTF_8);
+        return lpPersistence.getMapper().readValue(newPluginsData, StoragePlugins.class);
+      } catch (IOException e) {
+        logger.error("Failures are obtained while loading %s file. Proceed without update",
+            CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e);
+      }
+    }
+    logger.trace("The {} file is absent. Proceed without updating of the storage plugins configs",
+        CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF);
+    return null;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index 16836c2956..f6a7a375c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -90,6 +90,7 @@ public static void updateSchemaLocation(final String pluginName,
         pluginConfig.getConfig(),
         newWorkspaces,
         pluginConfig.getFormats());
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true);
   }
 
@@ -137,6 +138,7 @@ public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry,
         fileSystemConfig.getConfig(),
         fileSystemConfig.getWorkspaces(),
         newFormats);
+    newFileSystemConfig.setEnabled(fileSystemConfig.isEnabled());
 
     pluginRegistry.createOrUpdate(storagePlugin, newFileSystemConfig, true);
   }
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 417635a0b2..42cddd8655 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -68,12 +68,13 @@
             "crw", "cr2", "nef", "orf", "raf", "rw2", "rwl", "srw", "x3f"
           ]
         }
-      }
+      },
+      enabled : true
     },
+
     s3: {
       type: "file",
       connection: "s3a://my.bucket.location.com",
-      enabled : false,
       config : {
         "fs.s3a.access.key": "ID",
         "fs.s3a.secret.key": "SECRET"
@@ -124,7 +125,8 @@
           delimiter: ",",
           extractHeader: true
         }
-      }
+      },
+      enabled : false
     },
 
     cp: {
@@ -166,7 +168,8 @@
             "crw", "cr2", "nef", "orf", "raf", "rw2", "rwl", "srw", "x3f"
           ]
         }
-      }
+      },
+      enabled : true
     }
   }
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 23d59d3f80..c62dd069b5 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -103,7 +103,10 @@ drill.exec: {
         buffer.size: 262144,
         batch.size: 4000
       }
-    }
+    },
+    # The action on the storage-plugins-override.conf after it's use.
+    # Possible values are "none" (default), "rename", "remove"
+    action_on_plugins_override_file: "none"
   },
   zk: {
     connect: "localhost:2181",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 6a4452e63e..2b112e2cb4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -131,6 +131,7 @@ protected static void addMiniDfsBasedStorage(final Map<String, WorkspaceConfig>
     createAndAddWorkspace("tmp", "/tmp", (short) 0777, processUser, processUser, workspaces);
 
     FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null, workspaces, lfsPluginConfig.getFormats());
+    miniDfsPluginConfig.setEnabled(true);
     pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
index ebf2cdd0be..c4ababf1c4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -66,6 +66,7 @@ public static void init() throws Exception {
         pluginConfig.getConfig(),
         newWorkspaces,
         pluginConfig.getFormats());
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
     pluginRegistry.createOrUpdate(DFS_PLUGIN_NAME, newPluginConfig, true);
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 096c8cdb72..77df0097b8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -277,7 +277,7 @@ private void configureStoragePlugins(Drillbit bit) throws Exception {
     MockStorageEngine plugin = new MockStorageEngine(
         MockStorageEngineConfig.INSTANCE, bit.getContext(),
         MockStorageEngineConfig.NAME);
-    ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(MockStorageEngineConfig.NAME, config, plugin);
+    ((StoragePluginRegistryImpl) pluginRegistry).addPluginToPersistentStoreIfAbsent(MockStorageEngineConfig.NAME, config, plugin);
   }
 
   private void applyOptions() throws Exception {
@@ -515,6 +515,8 @@ public static void defineWorkspace(Drillbit drillbit, String pluginName,
         pluginConfig.getConfig(),
         newWorkspaces,
         newFormats);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+
 
     pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
index 54d7bf0c59..03b0828d9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
@@ -42,7 +42,7 @@ public void insertMockStorage(String name, boolean breakRegisterSchema) {
       @SuppressWarnings("resource")
       MockBreakageStorage plugin = new MockBreakageStorage(
           MockStorageEngineConfig.INSTANCE, bit.getContext(), name);
-      ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(name, config, plugin);
+      ((StoragePluginRegistryImpl) pluginRegistry).addPluginToPersistentStoreIfAbsent(name, config, plugin);
 
       plugin.setBreakRegister(breakRegisterSchema);
     }
diff --git a/logical/pom.xml b/logical/pom.xml
index 07a942b19e..fbc653654a 100644
--- a/logical/pom.xml
+++ b/logical/pom.xml
@@ -66,13 +66,11 @@
     <dependency>
       <groupId>com.typesafe</groupId>
       <artifactId>config</artifactId>
-      <version>1.0.0</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <version>3.1</version>
     </dependency>
 
     <dependency>
diff --git a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
index 6a3df3a630..e04f3adc77 100644
--- a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
+++ b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
@@ -35,14 +35,14 @@
 
 
 public class LogicalPlanPersistence {
-  private ObjectMapper mapper;
+  private final ObjectMapper mapper;
 
-  public ObjectMapper getMapper() {
-    return mapper;
+  public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult) {
+    this(conf, scanResult, new ObjectMapper());
   }
 
-  public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult) {
-    mapper = new ObjectMapper();
+  public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult, ObjectMapper mapper) {
+    this.mapper = mapper;
 
     SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
         .addDeserializer(LogicalExpression.class, new LogicalExpression.De(conf))
@@ -59,6 +59,10 @@ public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult) {
     registerSubtypes(FormatPluginConfigBase.getSubTypes(scanResult));
   }
 
+  public ObjectMapper getMapper() {
+    return mapper;
+  }
+
   private <T> void registerSubtypes(Set<Class<? extends T>> types) {
     for (Class<? extends T> type : types) {
       mapper.registerSubtypes(type);
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 96c4036a2e..49335f665e 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -18,22 +18,39 @@
 package org.apache.drill.common.logical;
 
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 public abstract class StoragePluginConfig{
 
-  private boolean enabled = true;
+  private Boolean enabled;
 
+  /**
+   * Check for enabled status of the plugin
+   *
+   * @return true, when enabled. False, when disabled or status is absent
+   */
   public boolean isEnabled() {
-    return enabled;
+    return enabled != null && enabled;
   }
 
-  public void setEnabled(boolean enabled) {
+
+  public void setEnabled(Boolean enabled) {
     this.enabled = enabled;
   }
 
+  /**
+   * Allows to check whether the enabled status is present in config
+   *
+   * @return true if enabled status is present, false otherwise
+   */
+  @JsonIgnore
+  public boolean isEnabledStatusPresent() {
+    return enabled != null;
+  }
+
   @Override
   public abstract boolean equals(Object o);
 
diff --git a/pom.xml b/pom.xml
index 30b9129d8b..091ee6b391 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1363,6 +1363,11 @@
         <artifactId>jackson-core</artifactId>
         <version>${jackson.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.honton.chas.hocon</groupId>
+        <artifactId>jackson-dataformat-hocon</artifactId>
+        <version>1.1.1</version>
+      </dependency>
       <dependency>
         <groupId>com.mapr.db</groupId>
         <artifactId>maprdb</artifactId>
@@ -1540,6 +1545,16 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+        <version>3.2</version>
+      </dependency>
+      <dependency>
+        <groupId>com.typesafe</groupId>
+        <artifactId>config</artifactId>
+        <version>1.0.0</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services