You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2019/05/09 01:58:51 UTC

[drill] 01/05: DRILL-7030: Make format plugins fully pluggable

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit edbfd64c4b7d8b7af406a507e959e5856185eaea
Author: Anton Gozhiy <an...@gmail.com>
AuthorDate: Mon Apr 15 19:24:11 2019 +0300

    DRILL-7030: Make format plugins fully pluggable
    
    - Bootstrap files for format plugins were introduced and added to the existing plugins in contrib.
    - Formats from these files are being added dynamically to the corresponding storage plugins.
    
    closes #1780
---
 .../main/resources/bootstrap-format-plugins.json   | 26 +++++++
 .../main/resources/bootstrap-format-plugins.json   | 20 +++++
 .../main/resources/bootstrap-format-plugins.json   | 26 +++++++
 .../java/org/apache/drill/exec/ExecConstants.java  |  1 +
 .../exec/store/StoragePluginRegistryImpl.java      | 88 ++++++++++++++++++----
 .../drill/exec/store/dfs/FileSystemConfig.java     |  3 +-
 6 files changed, 148 insertions(+), 16 deletions(-)

diff --git a/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..3dda8cf
--- /dev/null
+++ b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,26 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "ltsv": {
+          "type": "ltsv",
+          "extensions": [
+            "ltsv"
+          ]
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "ltsv": {
+          "type": "ltsv",
+          "extensions": [
+            "ltsv"
+          ]
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..a126709
--- /dev/null
+++ b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "maprdb": {
+          "type": "maprdb"
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "maprdb": {
+          "type": "maprdb"
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..ee5a396
--- /dev/null
+++ b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,26 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "syslog": {
+          "type": "syslog",
+          "extensions": [
+            "syslog"
+          ]
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "syslog": {
+          "type": "syslog",
+          "extensions": [
+            "syslog"
+          ]
+        }
+      }
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3503507..93c9902 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -726,6 +726,7 @@ public final class ExecConstants {
       new OptionDescription("Min width for text readers, mostly for testing."));
 
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
+  public static final String BOOTSTRAP_FORMAT_PLUGINS_FILE = "bootstrap-format-plugins.json";
 
   public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning";
   public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index c5554f8..4718a20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -300,7 +301,8 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   }
 
   /**
-   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
+   * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE}
+   * and format plugins {@link ExecConstants#BOOTSTRAP_FORMAT_PLUGINS_FILE} files for the first fresh
    * instantiating of Drill
    *
    * @param lpPersistence deserialization mapper provider
@@ -310,22 +312,19 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
     // bootstrap load the config since no plugins are stored.
     logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
-    Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
-    if (urls != null && !urls.isEmpty()) {
-      logger.info("Loading the storage plugin configs from URLs {}.", urls);
+    Set<URL> storageUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+    Set<URL> formatUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_FORMAT_PLUGINS_FILE, false);
+    if (storageUrls != null && !storageUrls.isEmpty()) {
+      logger.info("Loading the storage plugin configs from URLs {}.", storageUrls);
       StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
       Map<String, URL> pluginURLMap = new HashMap<>();
-      for (URL url : urls) {
-        String pluginsData = Resources.toString(url, Charsets.UTF_8);
-        StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
-        for (Entry<String, StoragePluginConfig> plugin : plugins) {
-          StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
-          if (oldPluginConfig != null) {
-            logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
-                plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
-          } else {
-            pluginURLMap.put(plugin.getKey(), url);
-          }
+      for (URL url : storageUrls) {
+        loadStoragePlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
+      }
+      if (formatUrls != null && !formatUrls.isEmpty()) {
+        logger.info("Loading the format plugin configs from URLs {}.", formatUrls);
+        for (URL url : formatUrls) {
+          loadFormatPlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
         }
       }
       return bootstrapPlugins;
@@ -335,6 +334,65 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   }
 
   /**
+   * Loads storage plugins from the given URL
+   *
+   * @param url URL to the storage plugins bootstrap file
+   * @param bootstrapPlugins a collection where the plugins should be loaded to
+   * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
+   * @param lpPersistence need to get an object mapper for the bootstrap files
+   * @throws IOException if failed to retrieve a plugin from a bootstrap file
+   */
+  private void loadStoragePlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
+    StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
+    plugins.forEach(plugin -> {
+      StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
+      if (oldPluginConfig != null) {
+        logger.warn("Duplicate plugin instance '[{}]' defined in [{}, {}], ignoring the later one.",
+            plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
+      } else {
+        pluginURLMap.put(plugin.getKey(), url);
+      }
+    });
+  }
+
+  /**
+   * Loads format plugins from the given URL and adds the formats to the specified storage plugins
+   *
+   * @param url URL to the format plugins bootstrap file
+   * @param bootstrapPlugins a collection with loaded storage plugins. New formats will be added to them
+   * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
+   * @param lpPersistence need to get an object mapper for the bootstrap files
+   * @throws IOException if failed to retrieve a plugin from a bootstrap file
+   */
+  private void loadFormatPlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
+    StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
+    plugins.forEach(formatPlugin -> {
+      String targetStoragePluginName = formatPlugin.getKey();
+      StoragePluginConfig storagePlugin = bootstrapPlugins.getConfig(targetStoragePluginName);
+      StoragePluginConfig formatPluginValue = formatPlugin.getValue();
+      if (storagePlugin == null) {
+        logger.warn("No storage plugins with the given name are registered: '[{}]'", targetStoragePluginName);
+      } else if (storagePlugin instanceof FileSystemConfig && formatPluginValue instanceof FileSystemConfig) {
+        FileSystemConfig targetPlugin = (FileSystemConfig) storagePlugin;
+        ((FileSystemConfig) formatPluginValue).getFormats().forEach((formatName, formatValue) -> {
+          FormatPluginConfig oldPluginConfig = targetPlugin.getFormats().putIfAbsent(formatName, formatValue);
+          if (oldPluginConfig != null) {
+            logger.warn("Duplicate format instance '[{}]' defined in [{}, {}], ignoring the later one.",
+                formatName, pluginURLMap.get(targetStoragePluginName), url);
+          }
+        });
+      } else {
+        logger.warn("Formats are only supported by File System plugin type: '[{}]'", targetStoragePluginName);
+      }
+    });
+  }
+
+  private StoragePlugins getPluginsFromResource(URL resource, LogicalPlanPersistence lpPersistence) throws IOException {
+    String pluginsData = Resources.toString(resource, Charsets.UTF_8);
+    return lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+  }
+
+  /**
    * Dynamically loads system plugins annotated with {@link SystemPlugin}.
    * Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4b75e33..58f69a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -48,7 +49,7 @@ public class FileSystemConfig extends StoragePluginConfig {
     Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
     Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
     this.workspaces = caseInsensitiveWorkspaces;
-    this.formats = formats;
+    this.formats = formats != null ? formats : new LinkedHashMap<>();
   }
 
   @JsonProperty