You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2019/05/09 01:58:51 UTC
[drill] 01/05: DRILL-7030: Make format plugins fully pluggable
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit edbfd64c4b7d8b7af406a507e959e5856185eaea
Author: Anton Gozhiy <an...@gmail.com>
AuthorDate: Mon Apr 15 19:24:11 2019 +0300
DRILL-7030: Make format plugins fully pluggable
- Bootstrap files for format plugins were introduced and added to the existing plugins in contrib.
- Formats from these files are being added dynamically to the corresponding storage plugins.
closes #1780
---
.../main/resources/bootstrap-format-plugins.json | 26 +++++++
.../main/resources/bootstrap-format-plugins.json | 20 +++++
.../main/resources/bootstrap-format-plugins.json | 26 +++++++
.../java/org/apache/drill/exec/ExecConstants.java | 1 +
.../exec/store/StoragePluginRegistryImpl.java | 88 ++++++++++++++++++----
.../drill/exec/store/dfs/FileSystemConfig.java | 3 +-
6 files changed, 148 insertions(+), 16 deletions(-)
diff --git a/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..3dda8cf
--- /dev/null
+++ b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,26 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "ltsv": {
+ "type": "ltsv",
+ "extensions": [
+ "ltsv"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "ltsv": {
+ "type": "ltsv",
+ "extensions": [
+ "ltsv"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..a126709
--- /dev/null
+++ b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "maprdb": {
+ "type": "maprdb"
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "maprdb": {
+ "type": "maprdb"
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..ee5a396
--- /dev/null
+++ b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,26 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "syslog": {
+ "type": "syslog",
+ "extensions": [
+ "syslog"
+ ]
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "syslog": {
+ "type": "syslog",
+ "extensions": [
+ "syslog"
+ ]
+ }
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3503507..93c9902 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -726,6 +726,7 @@ public final class ExecConstants {
new OptionDescription("Min width for text readers, mostly for testing."));
public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
+ public static final String BOOTSTRAP_FORMAT_PLUGINS_FILE = "bootstrap-format-plugins.json";
public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning";
public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index c5554f8..4718a20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -300,7 +301,8 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
}
/**
- * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
+ * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE}
+ * and format plugins {@link ExecConstants#BOOTSTRAP_FORMAT_PLUGINS_FILE} files for the first fresh
* instantiating of Drill
*
* @param lpPersistence deserialization mapper provider
@@ -310,22 +312,19 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
// bootstrap load the config since no plugins are stored.
logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
- Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
- if (urls != null && !urls.isEmpty()) {
- logger.info("Loading the storage plugin configs from URLs {}.", urls);
+ Set<URL> storageUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
+ Set<URL> formatUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_FORMAT_PLUGINS_FILE, false);
+ if (storageUrls != null && !storageUrls.isEmpty()) {
+ logger.info("Loading the storage plugin configs from URLs {}.", storageUrls);
StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
Map<String, URL> pluginURLMap = new HashMap<>();
- for (URL url : urls) {
- String pluginsData = Resources.toString(url, Charsets.UTF_8);
- StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
- for (Entry<String, StoragePluginConfig> plugin : plugins) {
- StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
- if (oldPluginConfig != null) {
- logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
- plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
- } else {
- pluginURLMap.put(plugin.getKey(), url);
- }
+ for (URL url : storageUrls) {
+ loadStoragePlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
+ }
+ if (formatUrls != null && !formatUrls.isEmpty()) {
+ logger.info("Loading the format plugin configs from URLs {}.", formatUrls);
+ for (URL url : formatUrls) {
+ loadFormatPlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
}
}
return bootstrapPlugins;
@@ -335,6 +334,65 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
}
/**
+ * Loads storage plugins from the given URL
+ *
+ * @param url URL to the storage plugins bootstrap file
+ * @param bootstrapPlugins a collection where the plugins should be loaded to
+ * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
+ * @param lpPersistence need to get an object mapper for the bootstrap files
+ * @throws IOException if failed to retrieve a plugin from a bootstrap file
+ */
+ private void loadStoragePlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
+ StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
+ plugins.forEach(plugin -> {
+ StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
+ if (oldPluginConfig != null) {
+ logger.warn("Duplicate plugin instance '[{}]' defined in [{}, {}], ignoring the later one.",
+ plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
+ } else {
+ pluginURLMap.put(plugin.getKey(), url);
+ }
+ });
+ }
+
+ /**
+ * Loads format plugins from the given URL and adds the formats to the specified storage plugins
+ *
+ * @param url URL to the format plugins bootstrap file
+ * @param bootstrapPlugins a collection with loaded storage plugins. New formats will be added to them
+ * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
+ * @param lpPersistence need to get an object mapper for the bootstrap files
+ * @throws IOException if failed to retrieve a plugin from a bootstrap file
+ */
+ private void loadFormatPlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
+ StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
+ plugins.forEach(formatPlugin -> {
+ String targetStoragePluginName = formatPlugin.getKey();
+ StoragePluginConfig storagePlugin = bootstrapPlugins.getConfig(targetStoragePluginName);
+ StoragePluginConfig formatPluginValue = formatPlugin.getValue();
+ if (storagePlugin == null) {
+ logger.warn("No storage plugins with the given name are registered: '[{}]'", targetStoragePluginName);
+ } else if (storagePlugin instanceof FileSystemConfig && formatPluginValue instanceof FileSystemConfig) {
+ FileSystemConfig targetPlugin = (FileSystemConfig) storagePlugin;
+ ((FileSystemConfig) formatPluginValue).getFormats().forEach((formatName, formatValue) -> {
+ FormatPluginConfig oldPluginConfig = targetPlugin.getFormats().putIfAbsent(formatName, formatValue);
+ if (oldPluginConfig != null) {
+ logger.warn("Duplicate format instance '[{}]' defined in [{}, {}], ignoring the later one.",
+ formatName, pluginURLMap.get(targetStoragePluginName), url);
+ }
+ });
+ } else {
+ logger.warn("Formats are only supported by File System plugin type: '[{}]'", targetStoragePluginName);
+ }
+ });
+ }
+
+ private StoragePlugins getPluginsFromResource(URL resource, LogicalPlanPersistence lpPersistence) throws IOException {
+ String pluginsData = Resources.toString(resource, Charsets.UTF_8);
+ return lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
+ }
+
+ /**
* Dynamically loads system plugins annotated with {@link SystemPlugin}.
* Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 4b75e33..58f69a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.dfs;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@@ -48,7 +49,7 @@ public class FileSystemConfig extends StoragePluginConfig {
Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
this.workspaces = caseInsensitiveWorkspaces;
- this.formats = formats;
+ this.formats = formats != null ? formats : new LinkedHashMap<>();
}
@JsonProperty