You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2020/03/13 16:15:33 UTC
[drill] branch master updated: DRILL-7620: Fix plugin mutability
issues
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 63e64c2 DRILL-7620: Fix plugin mutability issues
63e64c2 is described below
commit 63e64c2a8156a66ba1b1c8c1ec62e8da467bbbc9
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue Mar 3 15:03:49 2020 -0800
DRILL-7620: Fix plugin mutability issues
A recent commit made the plugin registry more strict about
the rule that, once a plugin is registered, it must be
immutable. A flaw enforcing that rule in the UI put the
registry in an inconsistent state.
Also
* Registry-specific errors
* Push more operations from UI layer into registry
* Clean up semantics of "resolve" for plugins
* Add more unit tests
* Better handling of "bad" plugins
* Force plugin names to lower case
* Fix comparison bugs in some format plugins
---
.../drill/exec/store/mapr/PluginConstants.java | 74 +--
.../drill/exec/store/mapr/PluginErrorHandler.java | 1 -
.../drill/exec/store/mapr/TableFormatMatcher.java | 2 +-
.../drill/exec/store/mapr/TableFormatPlugin.java | 12 +-
.../exec/store/mapr/TableFormatPluginConfig.java | 1 -
.../drill/exec/store/mapr/db/MapRDBSubScan.java | 2 +-
.../store/mapr/db/RestrictedMapRDBSubScan.java | 2 +-
.../store/mapr/db/binary/BinaryTableGroupScan.java | 7 +-
.../store/mapr/db/binary/MapRDBFilterBuilder.java | 2 +-
.../store/mapr/db/json/JsonTableGroupScan.java | 7 +-
.../db/json/JsonTableRangePartitionFunction.java | 5 +-
.../drill/exec/store/syslog/TestSyslogFormat.java | 19 +-
.../exec/store/hbase/AbstractHBaseDrillTable.java | 4 +-
.../exec/store/hbase/DrillHBaseConstants.java | 16 +-
.../drill/exec/store/hbase/DrillHBaseTable.java | 2 -
.../exec/store/hbase/HBaseConnectionManager.java | 5 +-
.../drill/exec/store/hbase/HBaseGroupScan.java | 8 +-
.../exec/store/hbase/HBasePushFilterIntoScan.java | 5 +-
.../drill/exec/store/hbase/HBaseRecordReader.java | 11 +-
.../drill/exec/store/hbase/HBaseRegexParser.java | 8 +-
.../exec/store/hbase/HBaseScanBatchCreator.java | 2 +-
.../drill/exec/store/hbase/HBaseSchemaFactory.java | 5 +-
.../exec/store/hbase/HBaseStoragePluginConfig.java | 6 +-
.../drill/exec/store/hbase/HBaseSubScan.java | 6 +-
.../hive/HiveDrillNativeParquetRowGroupScan.java | 2 +-
.../store/hive/HiveDrillNativeParquetScan.java | 2 +-
.../org/apache/drill/exec/store/hive/HiveScan.java | 6 +-
.../apache/drill/exec/store/hive/HiveSubScan.java | 3 +-
.../exec/store/hive/HiveTableWithColumnCache.java | 1 +
.../drill/exec/store/hive/HiveUtilities.java | 4 +-
.../exec/store/hive/schema/HiveSchemaFactory.java | 6 +-
.../org/apache/drill/exec/hive/HiveTestBase.java | 1 -
.../apache/drill/exec/hive/HiveTestFixture.java | 21 +-
.../drill/exec/store/jdbc/JdbcGroupScan.java | 3 +-
.../apache/drill/exec/store/jdbc/JdbcSubScan.java | 2 +-
.../drill/exec/store/kafka/KafkaGroupScan.java | 3 +-
.../drill/exec/store/kafka/KafkaSubScan.java | 2 +-
.../drill/exec/store/kudu/DrillKuduTable.java | 1 -
.../drill/exec/store/kudu/KuduGroupScan.java | 9 +-
.../drill/exec/store/kudu/KuduRecordReader.java | 5 +-
.../exec/store/kudu/KuduRecordWriterImpl.java | 13 +-
.../exec/store/kudu/KuduScanBatchCreator.java | 2 -
.../apache/drill/exec/store/kudu/KuduScanSpec.java | 2 -
.../drill/exec/store/kudu/KuduSchemaFactory.java | 9 +-
.../drill/exec/store/kudu/KuduStoragePlugin.java | 7 -
.../exec/store/kudu/KuduStoragePluginConfig.java | 1 -
.../apache/drill/exec/store/kudu/KuduSubScan.java | 21 +-
.../apache/drill/exec/store/kudu/KuduWriter.java | 3 +-
.../exec/store/kudu/KuduWriterBatchCreator.java | 1 -
.../drill/exec/store/mongo/MongoFilterBuilder.java | 2 +-
.../drill/exec/store/mongo/MongoGroupScan.java | 7 +-
.../exec/store/mongo/MongoStoragePluginConfig.java | 6 +-
.../drill/exec/store/mongo/MongoSubScan.java | 4 +-
.../drill/exec/store/openTSDB/Constants.java | 14 +-
.../exec/store/openTSDB/OpenTSDBGroupScan.java | 9 +-
.../drill/exec/store/openTSDB/OpenTSDBSubScan.java | 7 +-
.../org/apache/calcite/jdbc/DynamicRootSchema.java | 4 +-
.../org/apache/drill/exec/opt/BasicOptimizer.java | 6 +-
.../exec/physical/base/AbstractGroupScan.java | 3 +-
.../logical/FileSystemCreateTableEntry.java | 8 +-
.../sql/handlers/DescribeSchemaHandler.java | 21 +-
.../exec/planner/sql/handlers/SqlHandlerUtil.java | 4 +-
.../drill/exec/server/rest/DrillRestServer.java | 13 +-
.../exec/server/rest/PluginConfigWrapper.java | 41 +-
.../drill/exec/server/rest/StorageResources.java | 129 +++--
.../drill/exec/store/ClassicConnectorLocator.java | 3 +-
.../exec/store/DrillbitPluginRegistryContext.java | 10 +-
.../exec/store/PluginBootstrapLoaderImpl.java | 4 +-
.../org/apache/drill/exec/store/PluginHandle.java | 5 +
.../drill/exec/store/PluginRegistryContext.java | 1 +
.../apache/drill/exec/store/StoragePluginMap.java | 78 +--
.../drill/exec/store/StoragePluginRegistry.java | 242 +++++++-
.../exec/store/StoragePluginRegistryImpl.java | 433 +++++++++++---
.../drill/exec/store/dfs/FileSystemConfig.java | 11 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 1 -
.../drill/exec/store/dfs/easy/EasyGroupScan.java | 7 +-
.../drill/exec/store/dfs/easy/EasySubScan.java | 4 +-
.../drill/exec/store/dfs/easy/EasyWriter.java | 12 +-
.../sequencefile/SequenceFileFormatConfig.java | 31 +-
.../exec/store/httpd/HttpdLogFormatPlugin.java | 19 +-
.../drill/exec/store/image/ImageFormatConfig.java | 50 +-
.../exec/store/parquet/ParquetFormatConfig.java | 26 +-
.../drill/exec/store/parquet/ParquetGroupScan.java | 3 +-
.../exec/store/parquet/ParquetRowGroupScan.java | 2 +-
.../drill/exec/store/parquet/ParquetWriter.java | 8 +-
.../drill/exec/store/pcap/PcapFormatConfig.java | 27 +-
.../exec/store/pcapng/PcapngFormatConfig.java | 9 +
.../drill/exec/store/sys/SystemTableScan.java | 5 +-
.../drill/exec/util/StoragePluginTestUtils.java | 12 +-
.../impersonation/TestImpersonationMetadata.java | 3 +-
.../impersonation/TestImpersonationQueries.java | 3 +-
.../drill/exec/store/BasePluginRegistryTest.java | 5 +-
.../drill/exec/store/TestPluginRegistry.java | 630 +++++++++++++++++----
.../apache/drill/exec/store/TestPluginsMap.java | 29 +-
.../drill/exec/store/httpd/TestHTTPDLogReader.java | 23 +-
.../apache/drill/exec/store/log/TestLogReader.java | 24 +-
.../drill/exec/store/pcap/TestPcapEVFReader.java | 19 +-
.../java/org/apache/drill/test/ClusterFixture.java | 20 +-
.../drill/test/ClusterMockStorageFixture.java | 4 +-
.../java/org/apache/drill/test/TestBuilder.java | 14 +-
.../common/config/LogicalPlanPersistence.java | 6 +-
.../drill/common/logical/StoragePluginConfig.java | 3 +
102 files changed, 1634 insertions(+), 772 deletions(-)
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
index 7a175a2..f355812 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
@@ -22,19 +22,21 @@ import org.apache.drill.common.expression.SchemaPath;
import com.mapr.db.DBConstants;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.PluginCost.CheckValid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class PluginConstants {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PluginConstants.class);
+public interface PluginConstants {
+ Logger logger = LoggerFactory.getLogger(PluginConstants.class);
- public final static CheckValid alwaysValid = new CheckValid<Integer>() {
+ public final static CheckValid<Integer> alwaysValid = new CheckValid<Integer>() {
@Override
public boolean isValid(Integer parameter) {
return true;
}
};
- public final static CheckValid isNonNegative = new CheckValid<Integer>() {
- @Override
+ public final static CheckValid<Integer> isNonNegative = new CheckValid<Integer>() {
+ @Override
public boolean isValid(Integer paramValue) {
if (paramValue > 0 && paramValue <= Integer.MAX_VALUE) {
return true;
@@ -45,49 +47,49 @@ public class PluginConstants {
}
};
- public static final String SSD = "SSD";
- public static final String HDD = "HDD";
- public static final SchemaPath ID_SCHEMA_PATH = SchemaPath.getSimplePath(ID_KEY);
+ String SSD = "SSD";
+ String HDD = "HDD";
+ SchemaPath ID_SCHEMA_PATH = SchemaPath.getSimplePath(ID_KEY);
- public static final SchemaPath DOCUMENT_SCHEMA_PATH = SchemaPath.getSimplePath(DBConstants.DOCUMENT_FIELD);
+ SchemaPath DOCUMENT_SCHEMA_PATH = SchemaPath.getSimplePath(DBConstants.DOCUMENT_FIELD);
- public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
+ int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
- public static final int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
- public static final int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;
+ int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
+ int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;
- public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
- public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
+ String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
+ int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
- public static final String JSON_TABLE_RESTRICTED_SCAN_SIZE_MB = "format-maprdb.json.restrictedScanSizeMB";
- public static final int JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT = 4096;
+ String JSON_TABLE_RESTRICTED_SCAN_SIZE_MB = "format-maprdb.json.restrictedScanSizeMB";
+ int JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT = 4096;
- public static final String JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING = "format-maprdb.json.useNumRegionsForDistribution";
- public static final boolean JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING_DEFAULT = false;
+ String JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING = "format-maprdb.json.useNumRegionsForDistribution";
+ boolean JSON_TABLE_USE_NUM_REGIONS_FOR_DISTRIBUTION_PLANNING_DEFAULT = false;
- public static final String JSON_TABLE_BLOCK_SIZE = "format-maprdb.json.pluginCost.blockSize";
- public static final int JSON_TABLE_BLOCK_SIZE_DEFAULT = 8192;
+ String JSON_TABLE_BLOCK_SIZE = "format-maprdb.json.pluginCost.blockSize";
+ int JSON_TABLE_BLOCK_SIZE_DEFAULT = 8192;
- public static final String JSON_TABLE_MEDIA_TYPE = "format-maprdb.json.mediaType";
- public static final String JSON_TABLE_MEDIA_TYPE_DEFAULT = SSD;
+ String JSON_TABLE_MEDIA_TYPE = "format-maprdb.json.mediaType";
+ String JSON_TABLE_MEDIA_TYPE_DEFAULT = SSD;
- public static final String JSON_TABLE_SSD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.ssdBlockSequentialReadCost";
- public static final int JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * JSON_TABLE_BLOCK_SIZE_DEFAULT;
+ String JSON_TABLE_SSD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.ssdBlockSequentialReadCost";
+ int JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * JSON_TABLE_BLOCK_SIZE_DEFAULT;
// for SSD random and sequential costs are the same
- public static final String JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.ssdBlockRandomReadCost";
- public static final int JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+ String JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.ssdBlockRandomReadCost";
+ int JSON_TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
- public static final String JSON_TABLE_AVERGE_COLUMN_SIZE = "format-maprdb.json.pluginCost.averageColumnSize";
- public static final int JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+ String JSON_TABLE_AVERGE_COLUMN_SIZE = "format-maprdb.json.pluginCost.averageColumnSize";
+ int JSON_TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
- public static final int TABLE_BLOCK_SIZE_DEFAULT = 8192;
- public static final int TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * TABLE_BLOCK_SIZE_DEFAULT;
- public static final int TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
- public static final int TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
- public static final String JSON_TABLE_HDD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.hddBlockSequentialReadCost";
- public static final int JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT = 6 * JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+ int TABLE_BLOCK_SIZE_DEFAULT = 8192;
+ int TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT = 32 * DrillCostBase.BASE_CPU_COST * TABLE_BLOCK_SIZE_DEFAULT;
+ int TABLE_SSD_BLOCK_RANDOM_READ_COST_DEFAULT = TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
+ int TABLE_AVERGE_COLUMN_SIZE_DEFAULT = 10;
+ String JSON_TABLE_HDD_BLOCK_SEQ_READ_COST = "format-maprdb.json.pluginCost.hddBlockSequentialReadCost";
+ int JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT = 6 * JSON_TABLE_SSD_BLOCK_SEQ_READ_COST_DEFAULT;
- public static final String JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.hddBlockRandomReadCost";
- public static final int JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT = 1000 * JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT;
+ String JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST = "format-maprdb.json.pluginCost.hddBlockRandomReadCost";
+ int JSON_TABLE_HDD_BLOCK_RANDOM_READ_COST_DEFAULT = 1000 * JSON_TABLE_HDD_BLOCK_SEQ_READ_COST_DEFAULT;
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
index d106d6e..5163934 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginErrorHandler.java
@@ -46,5 +46,4 @@ public final class PluginErrorHandler {
public static SchemaChangeException schemaChangeException(Logger logger, Throwable t, String format, Object... args) {
return new SchemaChangeException(format, t, args);
}
-
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
index 1fca75c..b2ede05 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java
@@ -44,6 +44,7 @@ public abstract class TableFormatMatcher extends FormatMatcher {
return false;
}
+ @Override
public DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin,
String storageEngineName, SchemaConfig schemaConfig) throws IOException {
@@ -73,5 +74,4 @@ public abstract class TableFormatMatcher extends FormatMatcher {
* by this format plugin. The path must point to a MapR table.
*/
protected abstract boolean isSupportedTable(MapRFileStatus status) throws IOException;
-
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
index aeb117a..8f5717c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -41,8 +40,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import com.mapr.fs.MapRFileSystem;
public abstract class TableFormatPlugin implements FormatPlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(TableFormatPlugin.class);
private final StoragePluginConfig storageConfig;
private final TableFormatPluginConfig config;
@@ -83,6 +80,7 @@ public abstract class TableFormatPlugin implements FormatPlugin {
return false;
}
+ @Override
public Configuration getFsConf() {
return fsConf;
}
@@ -120,11 +118,8 @@ public abstract class TableFormatPlugin implements FormatPlugin {
public synchronized AbstractStoragePlugin getStoragePlugin() {
if (this.storagePlugin == null) {
- try {
- this.storagePlugin = (AbstractStoragePlugin) context.getStorage().getPlugin(storageConfig);
- } catch (ExecutionSetupException e) {
- throw new RuntimeException(e);
- }
+ this.storagePlugin = context.getStorage().resolve(storageConfig,
+ AbstractStoragePlugin.class);
}
return storagePlugin;
}
@@ -133,5 +128,4 @@ public abstract class TableFormatPlugin implements FormatPlugin {
public MapRFileSystem getMaprFS() {
return maprfs;
}
-
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
index 72c69a6..f30d2b6 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java
@@ -34,5 +34,4 @@ public abstract class TableFormatPluginConfig implements FormatPluginConfig {
}
protected abstract boolean impEquals(Object obj);
-
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
index d01a63b..92195f5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -61,7 +61,7 @@ public class MapRDBSubScan extends AbstractDbSubScan {
@JsonProperty("tableType") String tableType,
@JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
this(userName,
- (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
+ engineRegistry.resolveFormat(storageConfig, formatPluginConfig, MapRDBFormatPlugin.class),
regionScanSpecList,
columns,
maxRecordsToRead,
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
index c1369a3..7fa8898 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
@@ -50,7 +50,7 @@ public class RestrictedMapRDBSubScan extends MapRDBSubScan {
@JsonProperty("tableType") String tableType,
@JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
this(userName,
- (MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
+ engineRegistry.resolveFormat(storageConfig, formatPluginConfig, MapRDBFormatPlugin.class),
regionScanSpecList, columns, maxRecordsToRead, tableType, schema);
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index ec496e9..7061372 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -73,7 +73,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
public static final String TABLE_BINARY = "binary";
- private HBaseScanSpec hbaseScanSpec;
+ private final HBaseScanSpec hbaseScanSpec;
private HTableDescriptor hTableDesc;
@@ -87,8 +87,8 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("schema") TupleMetadata schema,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException {
- this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ this(userName, pluginRegistry.resolve(storagePluginConfig, AbstractStoragePlugin.class),
+ pluginRegistry.resolveFormat(storagePluginConfig, formatPluginConfig, MapRDBFormatPlugin.class),
scanSpec, columns, null /* tableStats */, FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
@@ -160,6 +160,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseC
}
+ @Override
protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
HBaseScanSpec spec = hbaseScanSpec;
return new MapRDBSubScanSpec(
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
index f94e0a8..4cdf404 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/MapRDBFilterBuilder.java
@@ -50,7 +50,7 @@ public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void
private boolean allExpressionsConverted = true;
- private static Boolean nullComparatorSupported = null;
+ private static Boolean nullComparatorSupported;
public MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) {
this.groupScan = groupScan;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index fd910a7..d720634 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -125,8 +125,8 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("schema") TupleMetadata schema,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException, IOException {
- this(userName, (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ this(userName, pluginRegistry.resolve(storagePluginConfig, AbstractStoragePlugin.class),
+ pluginRegistry.resolveFormat(storagePluginConfig, formatPluginConfig, MapRDBFormatPlugin.class),
scanSpec, columns, new MapRDBStatistics(), FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
@@ -207,6 +207,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
}
}
+ @Override
protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
return getRegionsToScan(formatPlugin.getScanRangeSizeMB());
}
@@ -271,6 +272,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
return doNotAccessRegionsToScan;
}
+ @Override
protected MapRDBSubScanSpec getSubScanSpec(final TabletFragmentInfo tfi) {
// XXX/TODO check filter/Condition
final JsonScanSpec spec = scanSpec;
@@ -438,6 +440,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
return !formatPluginConfig.isEnablePushdown();
}
+ @Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return formatPluginConfig.isEnablePushdown();
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
index b6d7bfb..c97d303 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -25,6 +25,8 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.vector.ValueVector;
import org.ojai.store.QueryCondition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -42,8 +44,7 @@ import com.mapr.org.apache.hadoop.hbase.util.Bytes;
@JsonTypeName("jsontable-range-partition-function")
public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunction {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
+ private static final Logger logger = LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
@JsonProperty("refList")
protected List<FieldReference> refList;
diff --git a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
index a45a07f..b195d51 100644
--- a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
+++ b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
@@ -17,14 +17,14 @@
*/
package org.apache.drill.exec.store.syslog;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -48,21 +48,18 @@ public class TestSyslogFormat extends ClusterTest {
}
private static void defineSyslogPlugin() throws ExecutionSetupException {
+ Map<String, FormatPluginConfig> formats = new HashMap<>();
SyslogFormatConfig sampleConfig = new SyslogFormatConfig();
sampleConfig.setExtension("syslog");
+ formats.put("sample", sampleConfig);
SyslogFormatConfig flattenedDataConfig = new SyslogFormatConfig();
flattenedDataConfig.setExtension("syslog1");
flattenedDataConfig.setFlattenStructuredData(true);
+ formats.put("flat", flattenedDataConfig);
// Define a temporary plugin for the "cp" storage plugin.
- Drillbit drillbit = cluster.drillbit();
- final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
- final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
- pluginConfig.getFormats().put("sample", sampleConfig);
- pluginConfig.getFormats().put("flat", flattenedDataConfig);
- pluginRegistry.put("cp", pluginConfig);
+ cluster.defineFormats("cp", formats);
}
@Test
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
index 32f4d43..93d8739 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/AbstractHBaseDrillTable.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,7 +37,7 @@ import java.util.Set;
import static org.apache.drill.exec.store.hbase.DrillHBaseConstants.ROW_KEY;
public abstract class AbstractHBaseDrillTable extends DrillTable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHBaseDrillTable.class);
+ private static final Logger logger = LoggerFactory.getLogger(AbstractHBaseDrillTable.class);
protected HTableDescriptor tableDesc;
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index db1b1fa..1238407 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -23,19 +23,19 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
public interface DrillHBaseConstants {
- public static final String ROW_KEY = "row_key";
+ String ROW_KEY = "row_key";
- public static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+ SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
- public static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+ String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
- public static final MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY);
+ MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY);
- public static final MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP);
+ MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP);
- public static final MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
+ MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
- public static final String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";
+ String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";
- public static final String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";
+ String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
index 16de5eb..501e06d 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseTable.java
@@ -18,12 +18,10 @@
package org.apache.drill.exec.store.hbase;
-
public class DrillHBaseTable extends AbstractHBaseDrillTable {
public DrillHBaseTable(String storageEngineName, HBaseStoragePlugin plugin, HBaseScanSpec scanSpec) {
super(storageEngineName, plugin, scanSpec);
setTableDesc(plugin.getConnection(), scanSpec.getTableName());
}
-
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
index a6fa6f2..bb3e2a8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseConnectionManager.java
@@ -26,7 +26,8 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.hbase.HBaseStoragePlugin.HBaseConnectionKey;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
@@ -40,7 +41,7 @@ import org.apache.drill.shaded.guava.com.google.common.util.concurrent.Unchecked
*/
public final class HBaseConnectionManager
extends CacheLoader<HBaseConnectionKey, Connection> implements RemovalListener<HBaseConnectionKey, Connection> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseConnectionManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseConnectionManager.class);
public static final HBaseConnectionManager INSTANCE = new HBaseConnectionManager();
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index e1b41e8..cef696e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -69,7 +71,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@JsonTypeName("hbase-scan")
public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseGroupScan.class);
private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = (list1, list2) -> list1.size() - list2.size();
@@ -83,7 +85,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
private HBaseStoragePlugin storagePlugin;
- private Stopwatch watch = Stopwatch.createUnstarted();
+ private final Stopwatch watch = Stopwatch.createUnstarted();
private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
@@ -103,7 +105,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
@JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
+ this (userName, pluginRegistry.resolve(storagePluginConfig, HBaseStoragePlugin.class), hbaseScanSpec, columns);
}
public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 3faa089..517baec 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -40,7 +40,10 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule
super(operand, description);
}
- public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new HBasePushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "HBasePushFilterIntoScan:Filter_On_Scan") {
+ public static final StoragePluginOptimizerRule FILTER_ON_SCAN =
+ new HBasePushFilterIntoScan(RelOptHelper.some(
+ FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+ "HBasePushFilterIntoScan:Filter_On_Scan") {
@Override
public void onMatch(RelOptRuleCall call) {
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 2db1d02..f68088a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -50,13 +50,14 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseRecordReader.class);
// batch should not exceed this value to avoid OOM on a busy system
private static final int MAX_ALLOCATED_MEMORY_PER_BATCH = 64 * 1024 * 1024; // 64 mb in bytes
@@ -72,10 +73,10 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
private Table hTable;
private ResultScanner resultScanner;
- private TableName hbaseTableName;
- private Scan hbaseScan;
+ private final TableName hbaseTableName;
+ private final Scan hbaseScan;
// scan instance to capture columns for vector creation
- private Scan hbaseScanColumnsOnly;
+ private final Scan hbaseScanColumnsOnly;
private Set<String> completeFamilies;
private OperatorContext operatorContext;
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
index 3d8f9c1..a2abf76 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRegexParser.java
@@ -22,9 +22,11 @@ import java.util.regex.Pattern;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HBaseRegexParser {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRegexParser.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseRegexParser.class);
/**
* Regular expression pattern to parse the value operand of the SQL LIKE operator.
@@ -45,9 +47,9 @@ public class HBaseRegexParser {
private final String escapeChar_;
- private String regexString_ = null;
+ private String regexString_;
- private String prefixString_ = null;
+ private String prefixString_;
public HBaseRegexParser(FunctionCall call) {
this(likeString(call), escapeString(call));
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 967b8e6..a0fe3bd 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
+
@Override
public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan, List<RecordBatch> children)
throws ExecutionSetupException {
@@ -50,5 +51,4 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
}
return new ScanBatch(subScan, context, readers);
}
-
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7e4c1ca..21e6d72 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -28,11 +28,12 @@ import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Admin;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class HBaseSchemaFactory extends AbstractSchemaFactory {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseSchemaFactory.class);
private final HBaseStoragePlugin plugin;
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index 946f6f5..46e9720 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.logical.StoragePluginConfigBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -34,7 +36,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@JsonTypeName(HBaseStoragePluginConfig.NAME)
public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
private Map<String, String> config;
@@ -52,6 +54,8 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements
if (config == null) {
config = Maps.newHashMap();
}
+ // TODO: Config-based information should reside in the
+ // storage plugin instance, not here.
logger.debug("Initializing HBase StoragePlugin configuration with zookeeper quorum '{}', port '{}'.",
config.get(HConstants.ZOOKEEPER_QUORUM), config.get(HBASE_ZOOKEEPER_PORT));
if (sizeCalculatorEnabled == null) {
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 41aba6a..d4ea999 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -41,12 +41,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
- * Contains information for reading a single HBase region
+ * Information for reading a single HBase region
*/
-
@JsonTypeName("hbase-region-scan")
public class HBaseSubScan extends AbstractBase implements SubScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -59,7 +57,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
this(userName,
- (HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig),
+ registry.resolve(hbaseStoragePluginConfig, HBaseStoragePlugin.class),
regionScanSpecList,
columns);
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index d5436eb..6e6d469 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -62,7 +62,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
@JsonProperty("filter") LogicalExpression filter,
@JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
this(userName,
- (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
+ registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
rowGroupReadEntries,
columns,
hivePartitionHolder,
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 8b89488..9aae78f 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -65,7 +65,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
@JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
@JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
- this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
+ this.hiveStoragePlugin = engineRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class);
this.confProperties = confProperties;
this.metadataProvider = new HiveParquetTableMetadataProvider(entries, hivePartitionHolder, hiveStoragePlugin, readerConfig);
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index c7d04a9..3fd1258 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -55,7 +57,7 @@ import static org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWith
@JsonTypeName("hive-scan")
public class HiveScan extends AbstractGroupScan {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+ private static final Logger logger = LoggerFactory.getLogger(HiveScan.class);
private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20;
@@ -78,7 +80,7 @@ public class HiveScan extends AbstractGroupScan {
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this(userName,
hiveReadEntry,
- (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
+ pluginRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
columns,
null, confProperties);
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index a8032cc..5efa505 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -73,7 +73,8 @@ public class HiveSubScan extends AbstractBase implements SubScan {
hiveReadEntry,
splitClasses,
columns,
- (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties);
+ registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
+ confProperties);
}
public HiveSubScan(final String userName,
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
index f7dedf3..017c3ed 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTableWithColumnCache.java
@@ -28,6 +28,7 @@ import java.util.Map;
* This class is wrapper of {@link Table} class and used for
* storage of such additional information as column lists cache.
*/
+@SuppressWarnings("serial")
public class HiveTableWithColumnCache extends Table {
private ColumnListsCache columnListsCache;
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index f80bd80..f7c7099 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -91,6 +91,8 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
@@ -104,7 +106,7 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
public class HiveUtilities {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
+ private static final Logger logger = LoggerFactory.getLogger(HiveUtilities.class);
/**
* Partition value is received in string format. Convert it into appropriate object based on the type.
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 31610db..36e32bd 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -47,11 +47,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.util.ImpersonationUtil.getProcessUserName;
public class HiveSchemaFactory extends AbstractSchemaFactory {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(HiveSchemaFactory.class);
// MetaStoreClient created using process user credentials
private final DrillHiveMetaStoreClient processUserMetastoreClient;
@@ -59,7 +61,6 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
private final LoadingCache<String, DrillHiveMetaStoreClient> metaStoreClientLoadingCache;
private final HiveStoragePlugin plugin;
- private final HiveConf hiveConf;
private final boolean isDrillImpersonationEnabled;
private final boolean isHS2DoAsSet;
@@ -67,7 +68,6 @@ public class HiveSchemaFactory extends AbstractSchemaFactory {
super(name);
this.plugin = plugin;
- this.hiveConf = hiveConf;
isHS2DoAsSet = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS);
isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index 05ce7ae..e8e60ad 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -75,5 +75,4 @@ public class HiveTestBase extends PlanTestBase {
HIVE_TEST_FIXTURE.getPluginManager().removeHivePluginFrom(bits);
}
}
-
}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
index bff25ac..ad8c31c 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
@@ -26,9 +26,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.hive.HiveStoragePlugin;
import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
import org.apache.drill.test.BaseDirTestWatcher;
@@ -228,7 +228,7 @@ public class HiveTestFixture {
pluginConfig.setEnabled(true);
drillbit.getContext().getStorage().put(pluginName, pluginConfig);
}
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new RuntimeException("Failed to add Hive storage plugin to drillbits", e);
}
}
@@ -238,7 +238,13 @@ public class HiveTestFixture {
}
public void removeHivePluginFrom(Iterable<Drillbit> drillbits) {
- drillbits.forEach(bit -> bit.getContext().getStorage().remove(pluginName));
+ try {
+ for (Drillbit drillbit : drillbits) {
+ drillbit.getContext().getStorage().remove(pluginName);
+ }
+ } catch (PluginException e) {
+ throw new RuntimeException("Failed to remove Hive storage plugin for drillbits", e);
+ }
}
public void updateHivePlugin(Iterable<Drillbit> drillbits,
@@ -254,14 +260,12 @@ public class HiveTestFixture {
newPluginConfig.getConfigProps().putAll(configOverride);
pluginRegistry.put(pluginName, newPluginConfig);
}
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new RuntimeException("Failed to update Hive storage plugin for drillbits", e);
}
}
-
}
-
/**
* Implements method for initialization and passing
* of Hive to consumer instances in order to be used
@@ -275,8 +279,7 @@ public class HiveTestFixture {
* {@link HiveTestFixture}'s constructor will create instance,
* and API users will get it via {@link HiveTestFixture#getDriverManager()}.
*/
- private HiveDriverManager() {
- }
+ private HiveDriverManager() { }
public void runWithinSession(Consumer<Driver> driverConsumer) {
final HiveConf hiveConf = new HiveConf(SessionState.class);
@@ -289,7 +292,5 @@ public class HiveTestFixture {
throw new RuntimeException("Exception was thrown while closing SessionState", e);
}
}
-
}
-
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
index 199d922..4fcd14d 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -53,7 +53,7 @@ public class JdbcGroupScan extends AbstractGroupScan {
super("");
this.sql = sql;
this.columns = columns;
- this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+ this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
this.rows = rows;
}
@@ -92,6 +92,7 @@ public class JdbcGroupScan extends AbstractGroupScan {
return sql;
}
+ @Override
public List<SchemaPath> getColumns() {
return columns;
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index 43bf909..9991f6f 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -48,7 +48,7 @@ public class JdbcSubScan extends AbstractSubScan {
super("");
this.sql = sql;
this.columns = columns;
- this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+ this.plugin = plugins.resolve(config, JdbcStoragePlugin.class);
}
JdbcSubScan(String sql, List<SchemaPath> columns, JdbcStoragePlugin plugin) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index e4f255c..58c2f9f 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -64,7 +64,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@JsonTypeName("kafka-scan")
public class KafkaGroupScan extends AbstractGroupScan {
-
private static final Logger logger = LoggerFactory.getLogger(KafkaGroupScan.class);
// Assuming default average topic message size as 1KB, which will be used to
@@ -87,7 +86,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
@JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this(userName,
- (KafkaStoragePlugin) pluginRegistry.getPlugin(kafkaStoragePluginConfig),
+ pluginRegistry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
columns,
scanSpec);
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 6ea9e1d..9341aa6 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -53,7 +53,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
@JsonProperty("partitionSubScanSpecList") LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
throws ExecutionSetupException {
this(userName,
- (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
+ registry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
columns,
partitionSubScanSpecList);
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
index de95c49..4ed2394 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
@@ -30,7 +30,6 @@ import org.apache.kudu.Type;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class DrillKuduTable extends DynamicDrillTable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillKuduTable.class);
private final Schema schema;
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index 593ca12..b66d684 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -54,7 +54,6 @@ import org.apache.kudu.client.LocatedTablet.Replica;
@JsonTypeName("kudu-scan")
public class KuduGroupScan extends AbstractGroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
private static final long DEFAULT_TABLET_SIZE = 1000;
private KuduStoragePlugin kuduStoragePlugin;
@@ -72,7 +71,7 @@ public class KuduGroupScan extends AbstractGroupScan {
@JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this((KuduStoragePlugin) pluginRegistry.getPlugin(kuduStoragePluginConfig), kuduScanSpec, columns);
+ this(pluginRegistry.resolve(kuduStoragePluginConfig, KuduStoragePlugin.class), kuduScanSpec, columns);
}
public KuduGroupScan(KuduStoragePlugin kuduStoragePlugin,
@@ -112,9 +111,9 @@ public class KuduGroupScan extends AbstractGroupScan {
private static class KuduWork implements CompleteWork {
- private EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
- private byte[] partitionKeyStart;
- private byte[] partitionKeyEnd;
+ private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+ private final byte[] partitionKeyStart;
+ private final byte[] partitionKeyEnd;
public KuduWork(byte[] partitionKeyStart, byte[] partitionKeyEnd) {
this.partitionKeyStart = partitionKeyStart;
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 976b16d..abe6f0d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -61,12 +61,13 @@ import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.shaded.com.google.common.collect.ImmutableMap;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class KuduRecordReader extends AbstractRecordReader {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(KuduRecordReader.class);
private static final int TARGET_RECORD_COUNT = 4000;
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
index 9b98ccd..27f5bd5 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java
@@ -34,6 +34,8 @@ import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -41,7 +43,7 @@ import java.util.List;
import java.util.Map;
public class KuduRecordWriterImpl extends KuduRecordWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordWriterImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(KuduRecordWriterImpl.class);
private static final int FLUSH_FREQUENCY = 100;
@@ -49,7 +51,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
private final String name;
private final OperatorContext context;
private KuduTable table;
- private KuduSession session;
+ private final KuduSession session;
private Insert insert;
private int recordsSinceFlush;
@@ -63,9 +65,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
}
@Override
- public void init(Map<String, String> writerOptions) throws IOException {
-
- }
+ public void init(Map<String, String> writerOptions) throws IOException { }
@Override
public void updateSchema(VectorAccessible batch) throws IOException {
@@ -149,8 +149,7 @@ public class KuduRecordWriterImpl extends KuduRecordWriter {
}
@Override
- public void abort() throws IOException {
- }
+ public void abort() throws IOException { }
private void flush() throws IOException {
try {
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index a192a05..f2a95c6 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class);
@Override
public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan subScan, List<RecordBatch> children)
@@ -53,5 +52,4 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
}
return new ScanBatch(subScan, context, readers);
}
-
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
index 2e92369..371cf2b 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
@@ -33,6 +33,4 @@ public class KuduScanSpec {
public String getTableName() {
return tableName;
}
-
-
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
index 183849d..a4d7d3b 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -34,11 +34,12 @@ import org.apache.drill.exec.store.SchemaConfig;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.ListTablesResponse;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class KuduSchemaFactory extends AbstractSchemaFactory {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(KuduSchemaFactory.class);
private final KuduStoragePlugin plugin;
@@ -84,7 +85,6 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
logger.warn("Failure while retrieving kudu table {}", name, e);
return null;
}
-
}
@Override
@@ -111,7 +111,6 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
public List<String> getPartitionColumns() {
return Collections.emptyList();
}
-
};
}
@@ -136,7 +135,5 @@ public class KuduSchemaFactory extends AbstractSchemaFactory {
public String getTypeName() {
return KuduStoragePluginConfig.NAME;
}
-
}
-
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
index b1ae708..35c974d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class KuduStoragePlugin extends AbstractStoragePlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePlugin.class);
private final KuduStoragePluginConfig engineConfig;
private final KuduSchemaFactory schemaFactory;
@@ -45,11 +44,6 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build();
}
- @Override
- public void start() throws IOException {
-
- }
-
public KuduClient getClient() {
return client;
}
@@ -84,5 +78,4 @@ public class KuduStoragePlugin extends AbstractStoragePlugin {
public KuduStoragePluginConfig getConfig() {
return engineConfig;
}
-
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
index e3bab87..586d38a 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName(KuduStoragePluginConfig.NAME)
public class KuduStoragePluginConfig extends StoragePluginConfigBase {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePluginConfig.class);
public static final String NAME = "kudu";
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
index 3d35371..22e13a6 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -38,10 +38,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-// Class containing information for reading a single Kudu tablet
+/**
+ * Information for reading a single Kudu tablet
+ */
@JsonTypeName("kudu-sub-scan")
public class KuduSubScan extends AbstractBase implements SubScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class);
private final KuduStoragePlugin kuduStoragePlugin;
private final List<KuduSubScanSpec> tabletScanSpecList;
@@ -53,7 +54,7 @@ public class KuduSubScan extends AbstractBase implements SubScan {
@JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> tabletScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
super((String) null);
- kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(kuduStoragePluginConfig);
+ kuduStoragePlugin = registry.resolve(kuduStoragePluginConfig, KuduStoragePlugin.class);
this.tabletScanSpecList = tabletScanSpecList;
this.columns = columns;
}
@@ -118,23 +119,15 @@ public class KuduSubScan extends AbstractBase implements SubScan {
this.endKey = endKey;
}
- public String getTableName() {
- return tableName;
- }
-
- public byte[] getStartKey() {
- return startKey;
- }
+ public String getTableName() { return tableName; }
- public byte[] getEndKey() {
- return endKey;
- }
+ public byte[] getStartKey() { return startKey; }
+ public byte[] getEndKey() { return endKey; }
}
@Override
public int getOperatorType() {
return CoreOperatorType.KUDU_SUB_SCAN_VALUE;
}
-
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index 7611576..95f286e 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -32,7 +32,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KuduWriter extends AbstractWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriter.class);
private final KuduStoragePlugin plugin;
private final String name;
@@ -44,7 +43,7 @@ public class KuduWriter extends AbstractWriter {
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
super(child);
- this.plugin = (KuduStoragePlugin) engineRegistry.getPlugin(storageConfig);
+ this.plugin = engineRegistry.resolve(storageConfig, KuduStoragePlugin.class);
this.name = name;
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
index 22e86ba..643a136 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public class KuduWriterBatchCreator implements BatchCreator<KuduWriter> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class);
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KuduWriter config, List<RecordBatch> children)
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index 8bdf2e0..10720e4 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class MongoFilterBuilder extends
AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
DrillMongoConstants {
- static final Logger logger = LoggerFactory
+ private static final Logger logger = LoggerFactory
.getLogger(MongoFilterBuilder.class);
final MongoGroupScan groupScan;
final LogicalExpression le;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index 46ea567..aedbc82 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -81,7 +81,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private static final Integer select = Integer.valueOf(1);
- static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
+ private static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class);
private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() {
@Override
@@ -110,7 +110,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
private Map<String, List<ChunkInfo>> chunksInverseMapping;
- private Stopwatch watch = Stopwatch.createUnstarted();
+ private final Stopwatch watch = Stopwatch.createUnstarted();
private boolean filterPushedDown = false;
@@ -122,7 +122,8 @@ public class MongoGroupScan extends AbstractGroupScan implements
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
ExecutionSetupException {
- this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+ this(userName,
+ pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
scanSpec, columns);
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
index b065aa4..03acd05 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java
@@ -31,15 +31,13 @@ import com.mongodb.MongoCredential;
@JsonTypeName(MongoStoragePluginConfig.NAME)
public class MongoStoragePluginConfig extends StoragePluginConfig {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
- .getLogger(MongoStoragePluginConfig.class);
public static final String NAME = "mongo";
- private String connection;
+ private final String connection;
@JsonIgnore
- private MongoClientURI clientURI;
+ private final MongoClientURI clientURI;
@JsonCreator
public MongoStoragePluginConfig(@JsonProperty("connection") String connection) {
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 32f5e75..c85dc61 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -66,8 +66,8 @@ public class MongoSubScan extends AbstractBase implements SubScan {
super(userName);
this.columns = columns;
this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
- this.mongoStoragePlugin = (MongoStoragePlugin) registry
- .getPlugin(mongoPluginConfig);
+ this.mongoStoragePlugin = registry.resolve(
+ mongoPluginConfig, MongoStoragePlugin.class);
this.chunkScanSpecList = chunkScanSpecList;
}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
index c812ff5..ea113c6 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
@@ -21,12 +21,12 @@ public interface Constants {
/**
* openTSDB required constants for API call
*/
- public static final String DEFAULT_TIME = "47y-ago";
- public static final String SUM_AGGREGATOR = "sum";
+ String DEFAULT_TIME = "47y-ago";
+ String SUM_AGGREGATOR = "sum";
- public static final String TIME_PARAM = "start";
- public static final String END_TIME_PARAM = "end";
- public static final String METRIC_PARAM = "metric";
- public static final String AGGREGATOR_PARAM = "aggregator";
- public static final String DOWNSAMPLE_PARAM = "downsample";
+ String TIME_PARAM = "start";
+ String END_TIME_PARAM = "end";
+ String METRIC_PARAM = "metric";
+ String AGGREGATOR_PARAM = "aggregator";
+ String DOWNSAMPLE_PARAM = "downsample";
}
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
index 8a09637..6800903 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
@@ -47,9 +47,9 @@ import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
@JsonTypeName("openTSDB-scan")
public class OpenTSDBGroupScan extends AbstractGroupScan {
- private OpenTSDBStoragePluginConfig storagePluginConfig;
- private OpenTSDBScanSpec openTSDBScanSpec;
- private OpenTSDBStoragePlugin storagePlugin;
+ private final OpenTSDBStoragePluginConfig storagePluginConfig;
+ private final OpenTSDBScanSpec openTSDBScanSpec;
+ private final OpenTSDBStoragePlugin storagePlugin;
private List<SchemaPath> columns;
@@ -58,7 +58,8 @@ public class OpenTSDBGroupScan extends AbstractGroupScan {
@JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this((OpenTSDBStoragePlugin) pluginRegistry.getPlugin(openTSDBStoragePluginConfig), openTSDBScanSpec, columns);
+ this(pluginRegistry.resolve(openTSDBStoragePluginConfig, OpenTSDBStoragePlugin.class),
+ openTSDBScanSpec, columns);
}
public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin,
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
index ec04790..01e418b 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
@@ -31,8 +31,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
@@ -42,9 +40,6 @@ import java.util.List;
@JsonTypeName("openTSDB-sub-scan")
public class OpenTSDBSubScan extends AbstractBase implements SubScan {
- private static final Logger log =
- LoggerFactory.getLogger(OpenTSDBSubScan.class);
-
public final OpenTSDBStoragePluginConfig storage;
private final List<SchemaPath> columns;
@@ -57,7 +52,7 @@ public class OpenTSDBSubScan extends AbstractBase implements SubScan {
@JsonProperty("tabletScanSpecList") LinkedList<OpenTSDBSubScanSpec> tabletScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
super((String) null);
- openTSDBStoragePlugin = (OpenTSDBStoragePlugin) registry.getPlugin(storage);
+ openTSDBStoragePlugin = registry.resolve(storage, OpenTSDBStoragePlugin.class);
this.tabletScanSpecList = tabletScanSpecList;
this.storage = storage;
this.columns = columns;
diff --git a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
index 98146f3..78421ab 100644
--- a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -24,13 +24,13 @@ import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.BuiltInMethod;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserExceptionUtils;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.SubSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ public class DynamicRootSchema extends DynamicSchema {
schemaPlus.add(wrapper.getName(), wrapper);
}
}
- } catch(ExecutionSetupException | IOException ex) {
+ } catch(PluginException | IOException ex) {
logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
// We can't proceed further without a schema, throw a runtime exception.
UserException.Builder exceptBuilder =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 2551b6d..dd3c541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.opt;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -55,6 +54,7 @@ import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
@@ -202,10 +202,10 @@ public class BasicOptimizer extends Optimizer {
scan.getStorageEngine()));
}
try {
- final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
+ final StoragePlugin storagePlugin = queryContext.getStorage().getPluginByConfig(config);
final String user = userSession.getSession().getCredentials().getUserName();
return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions());
- } catch (IOException | ExecutionSetupException e) {
+ } catch (IOException | PluginException e) {
throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 9e82ffc..d355798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -71,7 +71,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
@Override
public GroupScan clone(List<SchemaPath> columns) {
- throw new UnsupportedOperationException(String.format("%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(String.format(
+ "%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 23ea23f..70244bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -43,9 +43,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class FileSystemCreateTableEntry implements CreateTableEntry {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemCreateTableEntry.class);
- private FileSystemConfig storageConfig;
- private FormatPlugin formatPlugin;
- private String location;
+ private final FileSystemConfig storageConfig;
+ private final FormatPlugin formatPlugin;
+ private final String location;
private final List<String> partitionColumns;
private final StorageStrategy storageStrategy;
@@ -58,7 +58,7 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
@JacksonInject StoragePluginRegistry engineRegistry)
throws ExecutionSetupException {
this.storageConfig = storageConfig;
- this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, FormatPlugin.class);
this.location = location;
this.partitionColumns = partitionColumns;
this.storageStrategy = storageStrategy;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
index 92a07c5..24ebf561 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeSchemaHandler.java
@@ -23,18 +23,20 @@ import com.fasterxml.jackson.core.io.CharacterEscapes;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDescribeSchema;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -45,13 +47,11 @@ import java.util.Map;
import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
public class DescribeSchemaHandler extends DefaultSqlHandler {
+ private static final Logger logger = LoggerFactory.getLogger(DescribeSchemaHandler.class);
- public DescribeSchemaHandler(SqlHandlerConfig config) {
- super(config);
- }
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DescribeSchemaHandler.class);
- private static final ObjectMapper mapper = new ObjectMapper(new ObjectMapper().getFactory().setCharacterEscapes(new CharacterEscapes() {
+ @SuppressWarnings("serial")
+ private static final ObjectMapper mapper = new ObjectMapper(
+ new ObjectMapper().getFactory().setCharacterEscapes(new CharacterEscapes() {
@Override
public int[] getEscapeCodesForAscii() {
// add standard set of escaping characters
@@ -69,6 +69,10 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
})).enable(INDENT_OUTPUT);
+ public DescribeSchemaHandler(SqlHandlerConfig config) {
+ super(config);
+ }
+
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
SqlIdentifier schema = unwrap(sqlNode, SqlDescribeSchema.class).getSchema();
@@ -88,7 +92,7 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
throw new DrillRuntimeException(String.format("Unable to find storage plugin with the following name [%s].",
drillSchema.getSchemaPath().get(0)));
}
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new DrillRuntimeException("Failure while retrieving storage plugin", e);
}
@@ -131,5 +135,4 @@ public class DescribeSchemaHandler extends DefaultSqlHandler {
this.properties = properties;
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 43d2f2a..9773f37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.planner.sql.handlers;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
@@ -45,7 +47,7 @@ import java.util.HashSet;
import java.util.List;
public class SqlHandlerUtil {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlHandlerUtil.class);
+ private static final Logger logger = LoggerFactory.getLogger(SqlHandlerUtil.class);
/**
* Resolve final RelNode of the new table (or view) for given table field list and new table definition.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 344db1d..141c027 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -57,6 +57,8 @@ import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
import org.glassfish.jersey.server.mvc.freemarker.FreemarkerMvcFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.servlet.ServletContext;
@@ -72,7 +74,7 @@ import java.util.ArrayList;
import java.util.List;
public class DrillRestServer extends ResourceConfig {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class);
+ static final Logger logger = LoggerFactory.getLogger(DrillRestServer.class);
public DrillRestServer(final WorkManager workManager, final ServletContext servletContext, final Drillbit drillbit) {
register(DrillRoot.class);
@@ -121,12 +123,13 @@ public class DrillRestServer extends ResourceConfig {
register(new AbstractBinder() {
@Override
protected void configure() {
+ DrillbitContext context = workManager.getContext();
bind(drillbit).to(Drillbit.class);
bind(workManager).to(WorkManager.class);
bind(executor).to(EventExecutor.class);
- bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
- bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class);
- bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class);
+ bind(context.getLpPersistence().getMapper()).to(ObjectMapper.class);
+ bind(context.getStoreProvider()).to(PersistentStoreProvider.class);
+ bind(context.getStorage()).to(StoragePluginRegistry.class);
bind(new UserAuthEnabled(isAuthEnabled)).to(UserAuthEnabled.class);
if (isAuthEnabled) {
bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
@@ -356,7 +359,7 @@ public class DrillRestServer extends ResourceConfig {
// Returns whether auth is enabled or not in config
public static class UserAuthEnabled {
- private boolean value;
+ private final boolean value;
public UserAuthEnabled(boolean value) {
this.value = value;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
index c19fcc9..30c46b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.server.rest;
import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,48 +31,23 @@ public class PluginConfigWrapper {
private final String name;
private final StoragePluginConfig config;
- private final boolean exists;
@JsonCreator
- public PluginConfigWrapper(@JsonProperty("name") String name, @JsonProperty("config") StoragePluginConfig config) {
+ public PluginConfigWrapper(@JsonProperty("name") String name,
+ @JsonProperty("config") StoragePluginConfig config) {
this.name = name;
this.config = config;
- this.exists = config != null;
}
- public String getName() {
- return name;
- }
+ public String getName() { return name; }
- public StoragePluginConfig getConfig() {
- return config;
- }
+ public StoragePluginConfig getConfig() { return config; }
public boolean enabled() {
- return exists && config.isEnabled();
- }
-
- public void createOrUpdateInStorage(StoragePluginRegistry storage) throws ExecutionSetupException {
- storage.put(name, config);
- }
-
- public boolean setEnabledInStorage(StoragePluginRegistry storage, boolean enabled) throws ExecutionSetupException {
- if (exists) {
- config.setEnabled(enabled);
- createOrUpdateInStorage(storage);
- }
- return exists;
- }
-
- public boolean exists() {
- return exists;
+ return config.isEnabled();
}
- public boolean deleteFromStorage(StoragePluginRegistry storage) {
- if (exists) {
- storage.remove(name);
- return true;
- }
- return false;
+ public void createOrUpdateInStorage(StoragePluginRegistry storage) throws PluginException {
+ storage.validatedPut(name, config);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 35e164f..1ed9128 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -17,14 +17,11 @@
*/
package org.apache.drill.exec.server.rest;
-import java.io.IOException;
-import java.io.StringReader;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -45,22 +42,22 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.xml.bind.annotation.XmlRootElement;
-import com.fasterxml.jackson.core.JsonParser;
import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginEncodingException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginNotFoundException;
import org.glassfish.jersey.server.mvc.Viewable;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.server.rest.auth.DrillUserPrincipal.ADMIN_ROLE;
+// Serialization of plugins to JSON is handled by the Jetty framework
+// as configured in DrillRestServer.
@Path("/")
@RolesAllowed(ADMIN_ROLE)
public class StorageResources {
@@ -73,9 +70,6 @@ public class StorageResources {
StoragePluginRegistry storage;
@Inject
- ObjectMapper mapper;
-
- @Inject
SecurityContext sc;
@Inject
@@ -91,11 +85,16 @@ public class StorageResources {
Comparator.comparing(PluginConfigWrapper::getName);
/**
- * Regex allows the following paths:
+ * Regex allows the following paths:<pre><code>
* /storage/{group}/plugins/export
- * /storage/{group}/plugins/export/{format}
- * Note: for the second case the format involves the leading slash, therefore it should be removed then
+ * /storage/{group}/plugins/export/{format}</code></pre>
+ * <p>
+ * Note: for the second case the format involves the leading slash,
+ * therefore it should be removed then
*/
+ // This code has a flaw: the Jackson ObjectMapper cannot serialize to
+ // HOCON format, though it can read HOCON. Thus, the only valid format
+ // is json.
@GET
@Path("/storage/{group}/plugins/export{format: (/[^/]+?)*}")
@Produces(MediaType.APPLICATION_JSON)
@@ -139,7 +138,7 @@ public class StorageResources {
@Produces(MediaType.APPLICATION_JSON)
public PluginConfigWrapper getPluginConfig(@PathParam("name") String name) {
try {
- return new PluginConfigWrapper(name, storage.getConfig(name));
+ return new PluginConfigWrapper(name, storage.getStoredConfig(name));
} catch (Exception e) {
logger.error("Failure while trying to access storage config: {}", name, e);
}
@@ -159,14 +158,14 @@ public class StorageResources {
@Path("/storage/{name}/enable/{val}")
@Produces(MediaType.APPLICATION_JSON)
public JsonResult enablePlugin(@PathParam("name") String name, @PathParam("val") Boolean enable) {
- PluginConfigWrapper plugin = getPluginConfig(name);
try {
- return plugin.setEnabledInStorage(storage, enable)
- ? message("Success")
- : message("Error (plugin does not exist)");
- } catch (ExecutionSetupException e) {
+ storage.setEnabled(name, enable);
+ return message("Success");
+ } catch (PluginNotFoundException e) {
+ return message("No plugin exists with the given name: " + name);
+ } catch (PluginException e) {
logger.debug("Error in enabling storage name: {} flag: {}", name, enable);
- return message("Error (unable to enable / disable storage)");
+ return message("Unable to enable/disable plugin:" + e.getMessage());
}
}
@@ -194,9 +193,12 @@ public class StorageResources {
@Path("/storage/{name}.json")
@Produces(MediaType.APPLICATION_JSON)
public JsonResult deletePlugin(@PathParam("name") String name) {
- return getPluginConfig(name).deleteFromStorage(storage)
- ? message("Success")
- : message("Error (unable to delete %s storage plugin)", name);
+ try {
+ storage.remove(name);
+ return message("Success");
+ } catch (PluginException e) {
+ return message(e.getMessage());
+ }
}
@POST
@@ -207,13 +209,17 @@ public class StorageResources {
try {
plugin.createOrUpdateInStorage(storage);
return message("Success");
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
logger.error("Unable to create/ update plugin: " + plugin.getName(), e);
- return message("Error while creating / updating storage : %s", e.getCause() == null ? e.getMessage() :
- e.getCause().getMessage());
+ return message("Error while saving plugin: %s", e.getMessage());
}
}
+ // Allows JSON that includes comments. However, since the JSON is immediately
+ // serialized, the comments are lost. Would be better to validate the JSON,
+ // then write the original JSON directly to the persistent store, and read
+ // JSON from the store, rather than letting Jackson produce it. That way,
+ // comments are preserved.
@POST
@Path("/storage/create_update")
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@@ -224,18 +230,13 @@ public class StorageResources {
return message("Error (a storage name cannot be empty)");
}
try {
- mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
- StoragePluginConfig config = mapper.readValue(new StringReader(storagePluginConfig), StoragePluginConfig.class);
- return createOrUpdatePluginJSON(new PluginConfigWrapper(name, config));
- } catch (JsonMappingException e) {
+ storage.putJson(name, storagePluginConfig);
+ return message("Success");
+ } catch (PluginEncodingException e) {
logger.debug("Error in JSON mapping: {}", storagePluginConfig, e);
- return message("Error (invalid JSON mapping)");
- } catch (JsonParseException e) {
- logger.debug("Error parsing JSON: {}", storagePluginConfig, e);
- return message("Error (unable to parse JSON)");
- } catch (IOException e) {
- logger.debug("Failed to read: {}", storagePluginConfig, e);
- return message("Error (unable to read)");
+ return message("Invalid JSON");
+ } catch (PluginException e) {
+ return message(e.getMessage());
}
}
@@ -248,19 +249,41 @@ public class StorageResources {
}
/**
- * Regex allows the following paths:
+ * Regex allows the following paths:<pre><code>
* /storage.json
- * /storage/{group}-plugins.json
- * Note: for the second case the group involves the leading slash, therefore it should be removed then
+ * /storage/{group}-plugins.json</code></pre>
+ * Allowable groups:
+ * <ul>
+ * <li>"all" {@link #ALL_PLUGINS}</li>
+ * <li>"enabled" {@link #ENABLED_PLUGINS}</li>
+ * <li>"disabled" {@link #DISABLED_PLUGINS}</li>
+ * </ul>
+ * Any other group value results in an empty list.
+ * <p>
+ * Note: for the second case the group involves the leading slash,
+ * therefore it should be removed then
*/
@GET
@Path("/storage{group: (/[^/]+?)*}-plugins.json")
@Produces(MediaType.APPLICATION_JSON)
public List<PluginConfigWrapper> getConfigsFor(@PathParam("group") String pluginGroup) {
+ PluginFilter filter;
+ switch (pluginGroup.trim()) {
+ case ALL_PLUGINS:
+ filter = PluginFilter.ALL;
+ break;
+ case ENABLED_PLUGINS:
+ filter = PluginFilter.ENABLED;
+ break;
+ case DISABLED_PLUGINS:
+ filter = PluginFilter.DISABLED;
+ break;
+ default:
+ return Collections.emptyList();
+ }
pluginGroup = StringUtils.isNotEmpty(pluginGroup) ? pluginGroup.replace("/", "") : ALL_PLUGINS;
return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(storage.storedConfigs().entrySet().iterator(), Spliterator.ORDERED), false)
- .filter(byPluginGroup(pluginGroup))
+ Spliterators.spliteratorUnknownSize(storage.storedConfigs(filter).entrySet().iterator(), Spliterator.ORDERED), false)
.map(entry -> new PluginConfigWrapper(entry.getKey(), entry.getValue()))
.sorted(PLUGIN_COMPARATOR)
.collect(Collectors.toList());
@@ -286,21 +309,7 @@ public class StorageResources {
@Produces(MediaType.APPLICATION_JSON)
@Deprecated
public JsonResult deletePluginViaGet(@PathParam("name") String name) {
- return getPluginConfig(name).deleteFromStorage(storage)
- ? message("Success")
- : message("Error (unable to delete %s storage plugin)", name);
- }
-
- private Predicate<Map.Entry<String, StoragePluginConfig>> byPluginGroup(String pluginGroup) {
- if (ALL_PLUGINS.equalsIgnoreCase(pluginGroup)) {
- return entry -> true;
- } else if (ENABLED_PLUGINS.equalsIgnoreCase(pluginGroup)) {
- return entry -> entry.getValue().isEnabled();
- } else if (DISABLED_PLUGINS.equalsIgnoreCase(pluginGroup)) {
- return entry -> !entry.getValue().isEnabled();
- } else {
- return entry -> false;
- }
+ return deletePlugin(name);
}
@XmlRootElement
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
index 7f18983..0b2694b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassicConnectorLocator.java
@@ -280,7 +280,8 @@ public class ClassicConnectorLocator implements ConnectorLocator {
throw ((ExecutionSetupException) t);
}
throw new ExecutionSetupException(String.format(
- "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+ "Failure setting up new storage plugin configuration for config %s",
+ pluginConfig.getClass().getSimpleName()), t);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
index 5a5187a..fc75f82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/DrillbitPluginRegistryContext.java
@@ -33,16 +33,19 @@ public class DrillbitPluginRegistryContext implements PluginRegistryContext {
private final DrillbitContext drillbitContext;
private final ObjectMapper mapper;
+ private final ObjectMapper hoconMapper;
public DrillbitPluginRegistryContext(DrillbitContext drillbitContext) {
this.drillbitContext = drillbitContext;
+ mapper = drillbitContext.getLpPersistence().getMapper();
+
// Specialized form of the persistence mechanism
// to handle HOCON format in the override file
LogicalPlanPersistence persistence = new LogicalPlanPersistence(drillbitContext.getConfig(),
drillbitContext.getClasspathScan(),
new ObjectMapper(new HoconFactory()));
- mapper = persistence.getMapper();
+ hoconMapper = persistence.getMapper();
}
@Override
@@ -56,6 +59,11 @@ public class DrillbitPluginRegistryContext implements PluginRegistryContext {
}
@Override
+ public ObjectMapper hoconMapper() {
+ return hoconMapper;
+ }
+
+ @Override
public ScanResult classpathScan() {
return drillbitContext.getClasspathScan();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
index aebe45e..8f27869 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginBootstrapLoaderImpl.java
@@ -227,7 +227,7 @@ public class PluginBootstrapLoaderImpl implements PluginBootstrapLoader {
}
pluginsOverrideFileUrl = urlSet.iterator().next();
try (InputStream is = pluginsOverrideFileUrl.openStream();) {
- return context.mapper().readValue(is, StoragePlugins.class);
+ return context.hoconMapper().readValue(is, StoragePlugins.class);
} catch (IOException e) {
logger.error("Failures are obtained while loading file: '{}'. Proceeding without update.",
upgradeFileName, e);
@@ -332,6 +332,6 @@ public class PluginBootstrapLoaderImpl implements PluginBootstrapLoader {
private StoragePlugins getPluginsFromResource(URL resource) throws IOException {
String pluginsData = Resources.toString(resource, Charsets.UTF_8);
- return context.mapper().readValue(pluginsData, StoragePlugins.class);
+ return context.hoconMapper().readValue(pluginsData, StoragePlugins.class);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
index 4a80e66..9b0d0ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginHandle.java
@@ -141,6 +141,8 @@ public class PluginHandle {
logger.info("Creating storage plugin for {}", name);
try {
plugin = connector.newInstance(name, config);
+ } catch (UserException e) {
+ throw e;
} catch (Exception e) {
throw UserException.internalError(e)
.addContext("Plugin name", name)
@@ -189,6 +191,9 @@ public class PluginHandle {
* from/to ephemeral storage and the plugin cache, since those two
* caches are not synchronized as a whole. Ensures that only one of the
* threads in a race condition will transfer the actual plugin instance.
+ * <p>
+ * By definition, a plugin becomes disabled if it moves to ephemeral,
+ * enabled if it moves from ephemeral into stored status.
*/
public synchronized PluginHandle transfer(PluginType type) {
if (plugin == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
index 3c94bf3..b249b66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/PluginRegistryContext.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public interface PluginRegistryContext {
DrillConfig config();
ObjectMapper mapper();
+ ObjectMapper hoconMapper();
ScanResult classpathScan();
// TODO: Remove this here and from StoragePlugin constructors.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
index 65b1e96..9968a0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java
@@ -23,12 +23,10 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Holds maps to storage plugins. Supports name => plugin and config => plugin
@@ -56,7 +54,6 @@ import org.slf4j.LoggerFactory;
* close. (The one exception is final close, which is done here.)
*/
class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(StoragePluginMap.class);
private final Map<String, PluginHandle> nameMap = CaseInsensitiveMap.newHashMap();
private final Map<StoragePluginConfig, PluginHandle> configMap = new HashMap<>();
@@ -66,8 +63,9 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
* the same plugin twice. Also safe for putting a different
*
* @return the replaced entry, if any, which the caller should close
+ * @throws PluginException for an attempt to replace a system plugin
*/
- public synchronized PluginHandle put(PluginHandle plugin) {
+ public synchronized PluginHandle put(PluginHandle plugin) throws PluginException {
PluginHandle oldPlugin = nameMap.put(plugin.name(), plugin);
if (oldPlugin != null) {
if (oldPlugin == plugin || oldPlugin.config().equals(plugin.config())) {
@@ -76,12 +74,7 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
if (oldPlugin.isIntrinsic()) {
// Put the old one back
nameMap.put(oldPlugin.name(), oldPlugin);
- throw UserException.permissionError()
- .message("Attempt to replace a system plugin.")
- .addContext("Plugin name", oldPlugin.name())
- .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
- .addContext("Attempted replacement", plugin.config().getClass().getName())
- .build(logger);
+ throw PluginException.systemPluginException("replace", plugin.name());
}
configMap.remove(oldPlugin.config());
}
@@ -127,19 +120,16 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
* @param newPlugin the new plugin to insert
* @return true if the new plugin was inserted, false if not because
* the old plugin was not found in the map
+ * @throws PluginException for an attempt to replace a system plugin
*/
- public synchronized boolean replace(PluginHandle oldPlugin, PluginHandle newPlugin) {
+ public synchronized boolean replace(PluginHandle oldPlugin, PluginHandle newPlugin)
+ throws PluginException {
Preconditions.checkArgument(oldPlugin != null);
Preconditions.checkArgument(newPlugin != null);
Preconditions.checkArgument(oldPlugin.name().equalsIgnoreCase(newPlugin.name()));
Preconditions.checkArgument(oldPlugin != newPlugin);
if (oldPlugin.isIntrinsic()) {
- throw UserException.permissionError()
- .message("Attempt to replace a system plugin.")
- .addContext("Plugin name", oldPlugin.name())
- .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
- .addContext("Attempted replacement", newPlugin.config().getClass().getName())
- .build(logger);
+ throw PluginException.systemPluginException("replace", oldPlugin.name());
}
boolean ok = nameMap.replace(oldPlugin.name(), oldPlugin, newPlugin);
if (ok) {
@@ -157,19 +147,16 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
*
* @return the doomed plugin if the plugin was removed, null if there was
* no entry by the given name
+ * @throws PluginException for an attempt to remove a system plugin
* @see {@link #remove(PluginHandle)
*/
- public synchronized PluginHandle remove(String name) {
+ public synchronized PluginHandle remove(String name) throws PluginException {
PluginHandle plugin = get(name);
if (plugin == null) {
return null;
}
if (plugin.isIntrinsic()) {
- throw UserException.permissionError()
- .message("Attempt to remove a system plugin.")
- .addContext("Plugin name", plugin.name())
- .addContext("Intrinsic plugin class", plugin.config().getClass().getName())
- .build(logger);
+ throw PluginException.systemPluginException("remove", name);
}
nameMap.remove(name);
configMap.remove(plugin.config(), plugin);
@@ -177,29 +164,6 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
}
/**
- * Removes the plugin, but only if it is in the map. That is,
- * resolves the name and removes the plugin only if it resolves
- * to the given plugin.
- *
- * @return true if the plugin was removed and the caller
- * should close it, false otherwise
- */
- public synchronized boolean remove(PluginHandle oldPlugin) {
- if (oldPlugin.isIntrinsic()) {
- throw UserException.permissionError()
- .message("Attempt to remove a system plugin.")
- .addContext("Plugin name", oldPlugin.name())
- .addContext("Intrinsic plugin class", oldPlugin.config().getClass().getName())
- .build(logger);
- }
- boolean ok = nameMap.remove(oldPlugin.name(), oldPlugin);
- if (ok) {
- configMap.remove(oldPlugin.config(), oldPlugin);
- }
- return ok;
- }
-
- /**
* Given a name and a config (which is presumed to have become disabled),
* remove and return any existing plugin. Only matches if the name is found and the
* named plugin has the same config as the one to remove to enforce
@@ -208,21 +172,23 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
* @param name plugin name
* @param oldConfig expected config of the doomed plugin
* @return true if the plugin was removed and closed, false otherwise
+ * @throws PluginException for an attempt to remove a system plugin
*/
- public synchronized PluginHandle remove(String name, StoragePluginConfig oldConfig) {
+ public synchronized PluginHandle remove(String name, StoragePluginConfig oldConfig) throws PluginException {
PluginHandle oldEntry = nameMap.get(name);
if (oldEntry == null || !oldEntry.config().equals(oldConfig)) {
return null;
}
if (oldEntry.isIntrinsic()) {
- throw UserException.permissionError()
- .message("Attempt to remove a system plugin.")
- .addContext("Plugin name", oldEntry.name())
- .addContext("Intrinsic plugin class", oldEntry.config().getClass().getName())
- .build(logger);
+ throw PluginException.systemPluginException("remove", name);
}
nameMap.remove(oldEntry.name());
- configMap.remove(oldEntry.config());
+ if (configMap.remove(oldEntry.config()) != oldEntry) {
+ // This is a programming error.
+ throw new IllegalStateException(String.format(
+ "Config entry was modified while in the plugin cache: '%s', class %s",
+ name, oldConfig.getClass().getName()));
+ }
return oldEntry;
}
@@ -261,4 +227,8 @@ class StoragePluginMap implements Iterable<PluginHandle>, AutoCloseable {
configMap.clear();
nameMap.clear();
}
+
+ public Set<String> names() {
+ return nameMap.keySet();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 7ed1663..f1a75ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -17,71 +17,214 @@
*/
package org.apache.drill.exec.store;
+import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.store.dfs.FormatPlugin;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
String PSTORE_NAME = "sys.storage_plugins";
+ @SuppressWarnings("serial")
+ public static class PluginException extends Exception {
+ public PluginException(String msg) {
+ super(msg);
+ }
+
+ public PluginException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public static PluginException systemPluginException(String operation, String name) {
+ return new PluginException(String.format(
+ "Cannot %s a system plugin: `%s`", operation, name));
+ }
+ }
+
+ /**
+ * Indicates the requested plugin was not found.
+ */
+ @SuppressWarnings("serial")
+ public static class PluginNotFoundException extends PluginException {
+ public PluginNotFoundException(String name) {
+ super("No storage plugin exists with name: `" + name + "`");
+ }
+ }
+
+ /**
+ * Indicates an error when decoding a plugin from JSON.
+ */
+ @SuppressWarnings("serial")
+ public static class PluginEncodingException extends PluginException {
+ public PluginEncodingException(String msg, Exception e) {
+ super(msg, e);
+ }
+ }
+
/**
- * Initialize the storage plugin registry. Must be called before the registry is used.
+ * Initialize the storage plugin registry. Must be called before the registry
+ * is used.
*/
void init();
/**
- * Store a plugin by name and configuration. If the plugin already exists, update the plugin
+ * Store a plugin by name and configuration. If the plugin already exists,
+ * update the plugin. This form directly updates persistent storage. The
+ * in-memory cache is updated on the next refresh. This form will accept an
+ * invalid plugin, which will be disabled upon refresh. Since Drill is
+ * distributed, and plugins work with external systems, the external system
+ * can become invalid at any moment (not just when pugins are updated), so the
+ * model used for {@code put()} mimics normal runtime operation.
*
* @param name The name of the plugin
* @param config The plugin configuration
* @return The StoragePlugin instance.
- * @throws ExecutionSetupException if plugin cannot be created
+ * @throws PluginException if plugin cannot be created
+ */
+ void put(String name, StoragePluginConfig config) throws PluginException;
+
+ /**
+ * Like {@link #put(String, StoragePluginConfig)}, but forces instantiation of the
+ * plugin to verify that the configuration is valid at this moment in time.
+ */
+ void validatedPut(String name, StoragePluginConfig config) throws PluginException;
+
+ /**
+ * Set the plugin to the requested enabled state. Does nothing if the plugin
+ * is already in the requested state. If a formerly enabled plugin is
+ * disabled, moves the plugin from the in-memory cache to the ephemeral
+ * store. If a formerly disabled plugin is enabled, verifies that the plugin
+ * can be instantiated as for {@link #verifiedPut()}.
+ * <p>
+ * Use this method when changing state. Do not obtain the config and change
+ * the state directly, doing so will make the plugin config inconsistent
+ * with the internal state.
+ *
+ * @param name name of the plugin
+ * @param enabled {@code true} to enable the plugin, {@code false} to disable
+ * @throws PluginNotFoundException if the plugin is not found
+ * @throws PluginException if the plugin name is not valid or
+ * if enabling a plugin and the plugin is not valid
*/
- void put(String name, StoragePluginConfig config) throws ExecutionSetupException;
+ void setEnabled(String name, boolean enabled) throws PluginException;
/**
* Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist.
*
* @param name The name of the plugin
* @return The StoragePlugin instance.
- * @throws ExecutionSetupException if plugin cannot be obtained
+ * @throws PluginException if plugin cannot be obtained
*/
- StoragePlugin getPlugin(String name) throws ExecutionSetupException;
+ StoragePlugin getPlugin(String name) throws PluginException, UserException;
/**
* Get a plugin by configuration. If it doesn't exist, create it.
*
* @param config The configuration for the plugin.
* @return The StoragePlugin instance.
- * @throws ExecutionSetupException if plugin cannot be obtained
+ * @throws PluginException if plugin cannot be obtained
+ */
+ StoragePlugin getPluginByConfig(StoragePluginConfig config) throws PluginException;
+
+ /**
+ * @deprecated use {@link #resolve(StoragePluginConfig, Class)} which provides
+ * type safety. Retained for compatibility with older plugins
*/
+ @Deprecated
StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException;
/**
- * Retrieve a stored configuration by name. Returns only defined
- * plugins (not system plugins). Returns both enabled and disabled
- * plugins. Use this to obtain a plugin for editing (rather than
+ * Return a plugin from persistent storage. Returns both enabled and
+ * disabled stored plugins, but does not return system plugins.
+ * Use this to obtain a plugin for editing (rather than
* for planning or executing a query.)
*/
- StoragePluginConfig getConfig(String name);
+ StoragePluginConfig getStoredConfig(String name);
+
+ /**
+ * Return a config encoded as JSON.
+ * @throws PluginException if the plugin is undefined
+ */
+ String encode(String name) throws PluginException;
+ String encode(StoragePluginConfig config);
+
+ /**
+ * Return a config decoded from JSON.
+ * @throws PluginEncodingException if the JSON is invalid
+ */
+ StoragePluginConfig decode(String json) throws PluginEncodingException;
+
+ /**
+ * Put a storage plugin config from JSON. Validates the JSON and the
+ * resulting storage plugin. Use this form for JSON received from
+ * the UI or other external source.
+ *
+ * @throws IOException if the JSON is invalid
+ * @throws PluginException if the underlying
+ * {@link #validatedPut(String, StoragePluginConfig)} fails
+ */
+ void putJson(String name, String json) throws PluginException;
+
+ /**
+ * Copy a stored config so that it can be modified.
+ * <p>
+ * <i><b>Never modify a config stored in the registry!</b></i>
+ * Configs are keyed by name and value; getting a config, then
+ * modifying it, will cause the value maps to become out of sync.
+ *
+ * @param name name of the storage plugin config to copy
+ * @return a copy of the config
+ * @throws PluginException if the name is undefined
+ */
+ StoragePluginConfig copyConfig(String name) throws PluginException;
+
+ /**
+ * Copy the given storage plugin config so it may be modified.
+ *
+ * @param config the storage plugin config to copy
+ * @return the copy
+ */
+ StoragePluginConfig copyConfig(StoragePluginConfig config);
+
+ /**
+ * Retrieve an available configuration. Returns enabled stored plugins
+ * and system plugins. These configs are those that can be used to
+ * plan a query.
+ */
+ StoragePluginConfig getDefinedConfig(String name);
/**
* Remove a plugin by name
*
* @param name The name of the storage plugin to remove
+ * @throws PluginException
*/
- void remove(String name);
+ void remove(String name) throws PluginException;
/**
- * Returns a copy of the set of all stored plugin configurations,
- * directly from the persistent store.
+ * Returns a set of all stored plugin configurations,
+ * directly from the persistent store. Note: the actual
+ * configs may reside in the cache; make a copy before
+ * making any changes.
* @return map of stored plugin configurations
*/
Map<String, StoragePluginConfig> storedConfigs();
+ enum PluginFilter { ALL, ENABLED, DISABLED };
+
+ /**
+ * Return a possibly-filtered set of plugins from the persistent
+ * store.
+ */
+ Map<String, StoragePluginConfig> storedConfigs(PluginFilter filter);
+
/**
* Returns a copy of the set of enabled stored plugin configurations.
* The registry is refreshed against the persistent store prior
@@ -91,14 +234,50 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
Map<String, StoragePluginConfig> enabledConfigs();
/**
- * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config.
+ * Returns the set of available plugin names.
+ * Includes system plugins and enabled stored plugins.
+ */
+ Set<String> availablePlugins();
+
+ /**
+ * Safe way to add or remove a format plugin config from a stored file
+ * system configuration. Makes a copy of the config, adds/removes the
+ * format plugin, and updates the persistent store with the copy.
*
- * @param storageConfig The storage config for the associated FileSystemPlugin
- * @param formatConfig The format config for the associated FormatPlugin
+ * @param pluginName name of the file system storage plugin config to
+ * modify
+ * @param formatName name of the format plugin to modify
+ * @param formatConfig if null, removes the plugin, if non-null updates
+ * the format plugin config with this value
+ * @throws PluginException if the storage plugin is undefined or
+ * is not a file format plugin
+ */
+ void putFormatPlugin(String pluginName, String formatName,
+ FormatPluginConfig formatConfig) throws PluginException;
+
+ /**
+ * Get the Format plugin for the FileSystemPlugin associated with the provided
+ * storage config and format config.
+ *
+ * @param storageConfig
+ * The storage config for the associated FileSystemPlugin
+ * @param formatConfig
+ * The format config for the associated FormatPlugin
* @return A FormatPlugin instance
- * @throws ExecutionSetupException if plugin cannot be obtained
+ * @throws PluginException
+ * if plugin cannot be obtained
*/
- FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException;
+ FormatPlugin getFormatPluginByConfig(StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig) throws PluginException;
+
+ /**
+ * @deprecated use
+ * {@link #resolveFormat(StoragePluginConfig, FormatPluginConfig, Class)}
+ * which provides type safety. Retained for compatibility with older plugins
+ */
+ @Deprecated
+ FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig) throws ExecutionSetupException;
/**
* Get the Schema factory associated with this storage plugin registry.
@@ -106,4 +285,29 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
* @return A SchemaFactory that can register the schemas associated with this plugin registry.
*/
SchemaFactory getSchemaFactory();
+
+ /**
+ * Object mapper to read/write the JSON form of a plugin.
+ * config.
+ */
+ ObjectMapper mapper();
+
+ <T extends StoragePlugin> T resolve(
+ StoragePluginConfig storageConfig, Class<T> desired);
+
+ /**
+ * Resolve a storage plugin given a storage plugin config. Call from
+ * within a file storage plugin to resolve the plugin.
+ *
+ * @param <T> the required type of the plugin
+ * @param storageConfig storage plugin config
+ * @param formatConfig format plugin config
+ * @param desired desired target class
+ * @return the storage plugin
+ * @throws IllegalStateException if the plugin is unknown or of the wrong
+ * format - errors which should never occur in normal operation
+ */
+ <T extends FormatPlugin> T resolveFormat(
+ StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig, Class<T> desired);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 8898b68..6ca2732 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -39,6 +39,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.PluginHandle.PluginType;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
@@ -47,6 +48,8 @@ import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* Plugin registry. Caches plugin instances which correspond to configurations
* stored in persistent storage. Synchronizes the instances and storage.
@@ -213,18 +216,34 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
@Override
public void init() {
locators.stream().forEach(loc -> loc.init());
- loadIntrinsicPlugins();
+ try {
+ loadIntrinsicPlugins();
+ } catch (PluginException e) {
+ // Should only occur for a programming error
+ throw new IllegalStateException("Failed to load system plugins", e);
+ }
defineConnectors();
prepareStore();
}
- private void loadIntrinsicPlugins() {
+ private void loadIntrinsicPlugins() throws PluginException {
for (ConnectorLocator locator : locators) {
Collection<StoragePlugin> intrinsicPlugins = locator.intrinsicPlugins();
if (intrinsicPlugins == null) {
continue;
}
for (StoragePlugin sysPlugin : intrinsicPlugins) {
+ // Enforce lower case names. Since the name of a system plugin
+ // is "hard coded", we can't adjust it if it is not already
+ // lower case. All we can do is fail to tell the developer that
+ // something is wrong.
+ String origName = sysPlugin.getName();
+ String lcName = sysPlugin.getName().toLowerCase();
+ if (!origName.equals(lcName)) {
+ throw new IllegalStateException(String.format(
+ "Plugin names must be in lower case but system plugin name `%s` is not",
+ origName));
+ }
ConnectorHandle connector = ConnectorHandle.intrinsicConnector(locator, sysPlugin);
defineConnector(connector);
pluginCache.put(new PluginHandle(sysPlugin, connector, PluginType.INTRINSIC));
@@ -298,7 +317,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
return;
}
for (Map.Entry<String, StoragePluginConfig> newPlugin : upgraded) {
- StoragePluginConfig oldPluginConfig = pluginStore.get(newPlugin.getKey());
+ StoragePluginConfig oldPluginConfig = getStoredConfig(newPlugin.getKey());
if (oldPluginConfig != null) {
copyPluginStatus(oldPluginConfig, newPlugin.getValue());
}
@@ -360,82 +379,176 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
}
@Override
- public void put(String name, StoragePluginConfig config) throws ExecutionSetupException {
+ public void put(String name, StoragePluginConfig config) throws PluginException {
+ name = validateName(name);
// Do not allow overwriting system plugins
// This same check is done later. However, we want to do this check
// before writing to the persistent store, which we must do before
// putting the plugin into the cache (where the second check is done.)
- PluginHandle oldEntry = pluginCache.get(name);
- if (oldEntry != null && oldEntry.isIntrinsic()) {
- throw UserException.permissionError()
- .message("Attempt to replace a system plugin.")
- .addContext("Plugin name", name)
- .addContext("Intrinsic plugin class", oldEntry.config().getClass().getName())
- .addContext("Attempted replacement", config.getClass().getName())
- .build(logger);
+ PluginHandle currentEntry = pluginCache.get(name);
+ if (currentEntry != null && currentEntry.isIntrinsic()) {
+ throw PluginException.systemPluginException(
+ "replace", name);
}
- // Write to the store first. We are now in a race to see which
- // thread will update the cache: might be us or might the another
- // thread.
+ // Write to the store. We don't bother to update the cache; we could
+ // only update our own cache, not those of other Drillbits. We rely
+ // on the cache refresh mechanism to kick in when the Drillbit asks
+ // for the plugin instance.
pluginStore.put(name, config);
+ }
+
+ private String validateName(String name) throws PluginException {
+ if (name == null) {
+ throw new PluginException("Plugin name cannot be null");
+ }
+ name = name.trim().toLowerCase();
+ if (name.isEmpty()) {
+ throw new PluginException("Plugin name cannot be null");
+ }
+ return name;
+ }
- // Will fail on an attempt to update a system plugin. Update
- // will be rejected if another thread beats us to it.
+ @Override
+ public void validatedPut(String name, StoragePluginConfig config)
+ throws PluginException {
+
+ name = validateName(name);
+ PluginHandle oldEntry;
if (config.isEnabled()) {
- PluginHandle newHandle = restoreFromEphemeral(name, config);
- oldEntry = pluginCache.put(newHandle);
+ PluginHandle entry = restoreFromEphemeral(name, config);
+ try {
+ entry.plugin();
+ } catch (Exception e) {
+ throw new PluginException(String.format(
+ "Invalid plugin config for '%s'", name), e);
+ }
+ oldEntry = pluginCache.put(entry);
} else {
- oldEntry = pluginCache.remove(name, config);
+ oldEntry = pluginCache.remove(name);
}
-
- // Let's optimistically assume that running queries may still use
- // the old config, so transfer the possibly-created instance to the
- // ephemeral store.
moveToEphemeral(oldEntry);
+ pluginStore.put(name, config);
+ }
+
+ @Override
+ public void setEnabled(String name, boolean enable) throws PluginException {
+
+ // Works only with the stored config. (Some odd persistent stores do not
+ // actually serialize the config; they just cache a copy.) If we change
+ // anything, the next request will do a resync to pick up the change.
+ name = validateName(name);
+ StoragePluginConfig config = requireStoredConfig(name);
+ if (config.isEnabled() == enable) {
+ return;
+ }
+ StoragePluginConfig copy = copyConfig(config);
+ copy.setEnabled(enable);
+ validatedPut(name, copy);
}
/**
- * If there is an ephemeral plugin of this (name, config), pair,
- * transfer that plugin out of ephemeral storage for reuse. Else
- * create a new handle.
- *
- * @param name plugin name
- * @param config plugin config
- * @return a handle for the plugin which may have been retrieved from
- * ephemeral storage
+ * Configs are obtained from the persistent store. This method is
+ * called only by the UI to edit a stored plugin; so no benefit to
+ * using the cache. We also want a plugin even if it is disabled,
+ * and disabled plugins do not reside in the cache.
+ * <p>
+ * Note that each call (depending on the store implementation)
+ * may return a distinct instance of the config. The instance will
+ * be equal (unless the stored version changes.) However, other
+ * versions of the store may return the same instance as is in
+ * the cache. So, <b>do not</b> modify the returned config.
+ * To modify the config, call {@link #copyConfig(String)} instead.
*/
- private PluginHandle restoreFromEphemeral(String name,
- StoragePluginConfig config) {
+ @Override
+ public StoragePluginConfig getStoredConfig(String name) {
+ return pluginStore.get(name);
+ }
- // Benign race condition between check and invalidate.
- PluginHandle ephemeralEntry = ephemeralPlugins.getIfPresent(config);
- if (ephemeralEntry == null || !name.equalsIgnoreCase(ephemeralEntry.name())) {
- return createPluginEntry(name, config, PluginType.STORED);
- } else {
+ @Override
+ public StoragePluginConfig copyConfig(String name) throws PluginException {
+ return copyConfig(requireStoredConfig(name));
+ }
- // Transfer the instance to a new handle, then invalidate the
- // cache entry. The transfer ensures that the invalidate will
- // not close the plugin instance
- PluginHandle newHandle = ephemeralEntry.transfer(PluginType.STORED);
- ephemeralPlugins.invalidate(config);
- return newHandle;
+ private StoragePluginConfig requireStoredConfig(String name) throws PluginException {
+ StoragePluginConfig config = getStoredConfig(name);
+ if (config == null) {
+ throw new PluginNotFoundException(name);
}
+ return config;
}
@Override
- public StoragePluginConfig getConfig(String name) {
- PluginHandle plugin = pluginCache.get(name);
- if (plugin != null) {
- return plugin.isIntrinsic() ? null : plugin.config();
+ public String encode(StoragePluginConfig config) {
+ ObjectMapper mapper = context.mapper();
+ try {
+ return mapper.writer()
+ .forType(config.getClass())
+ .writeValueAsString(config);
+ } catch (IOException e) {
+ // We control serialization, so no errors should occur.
+ throw new IllegalStateException("Serialize failed", e);
}
- return pluginStore.get(name);
+ }
+
+ @Override
+ public String encode(String name) throws PluginException {
+ return encode(requireStoredConfig(validateName(name)));
+ }
+
+ @Override
+ public StoragePluginConfig decode(String json) throws PluginEncodingException {
+
+ // We don't control the format of the input JSON, so an
+ // error could occur.
+ try {
+ return context.mapper().reader()
+ .forType(StoragePluginConfig.class)
+ .readValue(json);
+ } catch (IOException e) {
+ throw new PluginEncodingException("Failure when decoding plugin JSON", e);
+ }
+ }
+
+ @Override
+ public void putJson(String name, String json) throws PluginException {
+ validatedPut(name, decode(json));
+ }
+
+ @Override
+ public StoragePluginConfig copyConfig(StoragePluginConfig orig) {
+ try {
+
+ // TODO: Storage plugin configs don't define a "clone" or "copy"
+ // method, so use a round-trip to JSON to accomplish the same task.
+ return decode(encode(orig));
+ } catch (PluginEncodingException e) {
+ throw new IllegalStateException("De/serialize failed", e);
+ }
+ }
+
+ @Override
+ public StoragePluginConfig getDefinedConfig(String name) {
+ try {
+ name = validateName(name);
+ } catch (PluginException e) {
+ // Name is not valid, so no plugin matches the name.
+ return null;
+ }
+ PluginHandle entry = getEntry(name);
+ return entry == null ? null : entry.config();
}
// Gets a plugin with the named configuration
@Override
- public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
+ public StoragePlugin getPlugin(String name) throws PluginException {
+ try {
+ name = validateName(name);
+ } catch (PluginException e) {
+ // Name is not valid, so no plugin matches the name.
+ return null;
+ }
PluginHandle entry = getEntry(name);
// Lazy instantiation: the first call to plugin() creates the
@@ -448,7 +561,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
if (plugin != null && plugin.isIntrinsic()) {
return plugin;
}
- StoragePluginConfig config = pluginStore.get(name);
+ StoragePluginConfig config = getStoredConfig(name);
if (plugin == null) {
return refresh(name, config);
} else {
@@ -477,10 +590,16 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
if (config == null || !config.isEnabled()) {
// Move the old config to the ephemeral store.
- if (pluginCache.remove(entry)) {
- moveToEphemeral(entry);
+ try {
+ if (pluginCache.remove(entry.name()) == entry) {
+ moveToEphemeral(entry);
+ }
+ return null;
+ } catch (PluginException e) {
+ // Should never occur, only if the persistent store where to
+ // somehow contain an entry with the same name as a system plugin.
+ throw new IllegalStateException("Plugin refresh failed", e);
}
- return null;
}
// Unchanged?
if (entry.config().equals(config)) {
@@ -489,11 +608,17 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
// Plugin changed. Handle race condition on replacement.
PluginHandle newEntry = restoreFromEphemeral(entry.name(), config);
- if (pluginCache.replace(entry, newEntry)) {
- moveToEphemeral(entry);
- return newEntry;
- } else {
- return pluginCache.get(entry.name());
+ try {
+ if (pluginCache.replace(entry, newEntry)) {
+ moveToEphemeral(entry);
+ return newEntry;
+ } else {
+ return pluginCache.get(entry.name());
+ }
+ } catch (PluginException e) {
+ // Should never occur, only if the persistent store where to
+ // somehow contain an entry with the same name as a system plugin.
+ throw new IllegalStateException("Plugin refresh failed", e);
}
}
@@ -509,6 +634,23 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
@Override
public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+ try {
+ return getPluginByConfig(config);
+ } catch (PluginException e) {
+ throw translateException(e);
+ }
+ }
+
+ private ExecutionSetupException translateException(PluginException e) {
+ Throwable cause = e.getCause();
+ if (cause != null && cause instanceof ExecutionSetupException) {
+ return (ExecutionSetupException) cause;
+ }
+ return new ExecutionSetupException(e);
+ }
+
+ @Override
+ public StoragePlugin getPluginByConfig(StoragePluginConfig config) throws PluginException {
// Try to lookup plugin by configuration
PluginHandle plugin = pluginCache.get(config);
if (plugin != null) {
@@ -521,11 +663,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
return ephemeralPlugins.get(config).plugin();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- if (cause instanceof ExecutionSetupException) {
- throw (ExecutionSetupException) cause;
+ if (cause instanceof PluginException) {
+ throw (PluginException) cause;
} else {
// this shouldn't happen. here for completeness.
- throw new ExecutionSetupException(
+ throw new PluginException(
"Failure while trying to create ephemeral plugin.", cause);
}
}
@@ -536,22 +678,55 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
// the old one and added a new one of the same name.
// TODO: Fix this
@Override
- public void remove(String name) {
- PluginHandle entry = pluginCache.remove(name);
- if (entry != null) {
- moveToEphemeral(entry);
- }
+ public void remove(String name) throws PluginException {
+ name = validateName(name);
+
+ // Removing here allows us to check for system plugins
+ moveToEphemeral(pluginCache.remove(name));
// Must tell store to delete even if not known locally because
// the store might hold a disabled version
pluginStore.delete(name);
}
+ /**
+ * If there is an ephemeral plugin of this (name, config), pair,
+ * transfer that plugin out of ephemeral storage for reuse. Else
+ * create a new handle.
+ *
+ * @param name plugin name
+ * @param config plugin config
+ * @return a handle for the plugin which may have been retrieved from
+ * ephemeral storage
+ */
+ private PluginHandle restoreFromEphemeral(String name,
+ StoragePluginConfig config) {
+
+ // Benign race condition between check and invalidate.
+ PluginHandle ephemeralEntry = ephemeralPlugins.getIfPresent(config);
+ if (ephemeralEntry == null || !name.equalsIgnoreCase(ephemeralEntry.name())) {
+ return createPluginEntry(name, config, PluginType.STORED);
+ } else {
+
+ // Transfer the instance to a new handle, then invalidate the
+ // cache entry. The transfer ensures that the invalidate will
+ // not close the plugin instance
+ PluginHandle newHandle = ephemeralEntry.transfer(PluginType.STORED);
+ ephemeralPlugins.invalidate(config);
+ return newHandle;
+ }
+ }
+
private void moveToEphemeral(PluginHandle handle) {
if (handle == null) {
return;
}
+ // No need to move if no instance.
+ if (!handle.hasInstance()) {
+ return;
+ }
+
// If already in the ephemeral store, don't replace.
// Race condition is benign: two threads both doing the put
// will cause the first handle to be closed when the second hits.
@@ -564,11 +739,29 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
@Override
public Map<String, StoragePluginConfig> storedConfigs() {
+ return storedConfigs(PluginFilter.ALL);
+ }
+
+ @Override
+ public Map<String, StoragePluginConfig> storedConfigs(PluginFilter filter) {
Map<String, StoragePluginConfig> result = new HashMap<>();
Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginStore.load();
while (allPlugins.hasNext()) {
Entry<String, StoragePluginConfig> plugin = allPlugins.next();
- result.put(plugin.getKey(), plugin.getValue());
+ boolean include;
+ switch (filter) {
+ case ENABLED:
+ include = plugin.getValue().isEnabled();
+ break;
+ case DISABLED:
+ include = !plugin.getValue().isEnabled();
+ break;
+ default:
+ include = true;
+ }
+ if (include) {
+ result.put(plugin.getKey(), plugin.getValue());
+ }
}
return result;
}
@@ -586,20 +779,60 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
}
@Override
- public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
- StoragePlugin storagePlugin = getPlugin(storageConfig);
+ public void putFormatPlugin(String pluginName, String formatName,
+ FormatPluginConfig formatConfig) throws PluginException {
+ pluginName = validateName(pluginName);
+ formatName = validateName(formatName);
+ StoragePluginConfig orig = requireStoredConfig(pluginName);
+ if (!(orig instanceof FileSystemConfig)) {
+ throw new PluginException(
+ "Format plugins can be added only to the file system plugin: " + pluginName);
+ }
+ FileSystemConfig copy = (FileSystemConfig) copyConfig(orig);
+ if (formatConfig == null) {
+ copy.getFormats().remove(formatName);
+ } else {
+ copy.getFormats().put(formatName, formatConfig);
+ }
+ put(pluginName, copy);
+ }
+
+ @Override
+ public FormatPlugin getFormatPluginByConfig(StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig) throws PluginException {
+ StoragePlugin storagePlugin = getPluginByConfig(storageConfig);
return storagePlugin.getFormatPlugin(formatConfig);
}
@Override
+ public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig) throws ExecutionSetupException {
+ try {
+ return getFormatPluginByConfig(storageConfig, formatConfig);
+ } catch (PluginException e) {
+ throw translateException(e);
+ }
+ }
+
+ @Override
public SchemaFactory getSchemaFactory() {
return schemaFactory;
}
// TODO: Remove this: it will force plugins to be instantiated
// unnecessarily
+ // This is a bit of a hack. The planner calls this to get rules
+ // for queries. If even one plugin has issues, then all queries
+ // will fails, even those that don't use the invalid plugin.
+ //
+ // This hack may result in a delay (such as a timeout) again and
+ // again as each query tries to create the plugin. The solution is
+ // to disable the plugin, or fix the external system. This solution
+ // is more stable than, say, marking the plugin failed since we have
+ // no way to show or reset failed plugins.
private static class PluginIterator implements Iterator<Entry<String, StoragePlugin>> {
private final Iterator<PluginHandle> base;
+ private PluginHandle entry;
public PluginIterator(Iterator<PluginHandle> base) {
this.base = base;
@@ -607,18 +840,27 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
@Override
public boolean hasNext() {
- return base.hasNext();
+ while (base.hasNext()) {
+ entry = base.next();
+ try {
+ entry.plugin();
+ return true;
+ } catch (Exception e) {
+ // Skip this one to avoid failing the query
+ }
+ }
+ return false;
}
@Override
public Entry<String, StoragePlugin> next() {
- PluginHandle entry = base.next();
return new ImmutableEntry<>(entry.name(), entry.plugin());
}
}
@Override
public Iterator<Entry<String, StoragePlugin>> iterator() {
+ refresh();
return new PluginIterator(pluginCache.iterator());
}
@@ -672,8 +914,51 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
try {
plugin.plugin().registerSchemas(schemaConfig, parent);
} catch (Exception e) {
- logger.warn("Error during `{}` schema initialization: {}", plugin.name(), e.getMessage(), e.getCause());
+ logger.warn("Error during `{}` schema initialization: {}",
+ plugin.name(), e.getMessage(), e.getCause());
}
}
}
+
+ @Override
+ public ObjectMapper mapper() {
+ return context.mapper();
+ }
+
+ @Override
+ public <T extends StoragePlugin> T resolve(
+ StoragePluginConfig storageConfig, Class<T> desired) {
+ try {
+ return desired.cast(getPluginByConfig(storageConfig));
+ } catch (PluginException|ClassCastException e) {
+ // Should never occur
+ throw new IllegalStateException(String.format(
+ "Unable to load stroage plugin %s for provided config " +
+ "class %s", desired.getName(),
+ storageConfig.getClass().getName()), e);
+ }
+ }
+
+ @Override
+ public <T extends FormatPlugin> T resolveFormat(
+ StoragePluginConfig storageConfig,
+ FormatPluginConfig formatConfig, Class<T> desired) {
+ try {
+ return desired.cast(getFormatPluginByConfig(storageConfig, formatConfig));
+ } catch (PluginException|ClassCastException e) {
+ // Should never occur
+ throw new IllegalStateException(String.format(
+ "Unable to load format plugin %s for provided plugin " +
+ "config class %s and format config class %s",
+ desired.getName(),
+ storageConfig.getClass().getName(),
+ formatConfig.getClass().getName()), e);
+ }
+ }
+
+ @Override
+ public Set<String> availablePlugins() {
+ refresh();
+ return pluginCache.names();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 8f011ce..bc78d5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -31,7 +31,10 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
+
import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap.Builder;
@JsonTypeName(FileSystemConfig.NAME)
public class FileSystemConfig extends StoragePluginConfig {
@@ -49,7 +52,13 @@ public class FileSystemConfig extends StoragePluginConfig {
@JsonProperty("workspaces") Map<String, WorkspaceConfig> workspaces,
@JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
this.connection = connection;
- this.config = config;
+
+ // Force creation of an empty map so that configs compare equal
+ Builder<String, String> builder = ImmutableMap.builder();
+ if (config != null) {
+ builder.putAll(config);
+ }
+ this.config = builder.build();
Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
this.workspaces = caseInsensitiveWorkspaces;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index e7c4128..88ebe4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
* references to the FileSystem configuration and path management.
*/
public class FileSystemPlugin extends AbstractStoragePlugin {
-
private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class);
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 3c032c3..84d161d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
import java.util.List;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -90,12 +89,10 @@ public class EasyGroupScan extends AbstractFileGroupScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") Path selectionRoot,
@JsonProperty("schema") TupleMetadata schema
- ) throws IOException, ExecutionSetupException {
+ ) throws IOException {
super(ImpersonationUtil.resolveUserName(userName));
this.selection = FileSelection.create(null, files, selectionRoot);
- this.formatPlugin = Preconditions.checkNotNull((EasyFormatPlugin<?>)
- engineRegistry.getFormatPlugin(storageConfig, formatConfig),
- "Unable to load format plugin for provided format config.");
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
this.columns = columns == null ? ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
SimpleFileTableMetadataProviderBuilder builder =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 82e5cac..86a1292 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -33,7 +33,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
@JsonTypeName("fs-sub-scan")
@@ -59,8 +58,7 @@ public class EasySubScan extends AbstractSubScan {
@JsonProperty("schema") TupleMetadata schema
) throws ExecutionSetupException {
super(userName);
- this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
- Preconditions.checkNotNull(this.formatPlugin);
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 379e2c9..2dec8c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -17,10 +17,8 @@
*/
package org.apache.drill.exec.store.dfs.easy;
-import java.io.IOException;
import java.util.List;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -33,11 +31,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@JsonTypeName("fs-writer")
public class EasyWriter extends AbstractWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
+ static final Logger logger = LoggerFactory.getLogger(EasyWriter.class);
private final String location;
private final List<String> partitionColumns;
@@ -51,11 +50,10 @@ public class EasyWriter extends AbstractWriter {
@JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JsonProperty("format") FormatPluginConfig formatConfig,
- @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+ @JacksonInject StoragePluginRegistry engineRegistry) {
super(child);
- this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
- Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, EasyFormatPlugin.class);
this.location = location;
this.partitionColumns = partitionColumns;
setStorageStrategy(storageStrategy);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
index 1e4d586..e9f84ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
@@ -20,35 +20,42 @@ package org.apache.drill.exec.store.easy.sequencefile;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
import java.util.List;
+import java.util.Objects;
@JsonTypeName("sequencefile") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class SequenceFileFormatConfig implements FormatPluginConfig {
public List<String> extensions = ImmutableList.of();
- @Override
- public int hashCode() {
- return (extensions == null)? 0 : extensions.hashCode();
- }
-
public List<String> getExtensions() {
return extensions;
}
@Override
+ public int hashCode() {
+ return Objects.hash(extensions);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
- } else if (obj == null) {
+ }
+ if (obj == null || getClass() != obj.getClass()) {
return false;
- } else if (getClass() == obj.getClass()) {
- SequenceFileFormatConfig other = (SequenceFileFormatConfig) obj;
- return (extensions == null)? (other.extensions == null) : extensions.equals(other.extensions);
}
- return false;
+ SequenceFileFormatConfig other = (SequenceFileFormatConfig) obj;
+ return Objects.equals(extensions, other.extensions);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .toString();
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 63eeb7a..7bcb0a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -61,7 +61,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig> {
-
private static final Logger logger = LoggerFactory.getLogger(HttpdLogFormatPlugin.class);
private static final String PLUGIN_EXTENSION = "httpd";
@@ -90,8 +89,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
}
/**
- * This class performs the work for the plugin. This is where all logic goes to read records. In this case httpd logs
- * are lines terminated with a new line character.
+ * Reads httpd logs lines terminated with a newline character.
*/
private class HttpdLogRecordReader extends AbstractRecordReader {
@@ -111,10 +109,11 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
}
/**
- * The query fields passed in are formatted in a way that Drill requires. Those must be cleaned up to work with the
- * parser.
+ * The query fields passed in are formatted in a way that Drill requires.
+ * Those must be cleaned up to work with the parser.
*
- * @return Map with Drill field names as a key and Parser Field names as a value
+ * @return Map with Drill field names as a key and Parser Field names as a
+ * value
*/
private Map<String, String> makeParserFields() {
Map<String, String> fieldMapping = new HashMap<>();
@@ -219,8 +218,9 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
}
/**
- * This plugin supports pushing down into the parser. Only fields specifically asked for within the configuration will
- * be parsed. If no fields are asked for then all possible fields will be returned.
+ * This plugin supports pushing project down into the parser. Only fields
+ * specifically asked for within the configuration will be parsed. If no
+ * fields are asked for then all possible fields will be returned.
*
* @return true
*/
@@ -230,7 +230,8 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
}
@Override
- public RecordReader getRecordReader(final FragmentContext context, final DrillFileSystem dfs, final FileWork fileWork, final List<SchemaPath> columns, final String userName) {
+ public RecordReader getRecordReader(final FragmentContext context, final DrillFileSystem dfs,
+ final FileWork fileWork, final List<SchemaPath> columns, final String userName) {
return new HttpdLogRecordReader(context, dfs, fileWork, columns);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
index b9e310c..aa12580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
@@ -19,7 +19,9 @@
package org.apache.drill.exec.store.image;
import java.util.List;
+import java.util.Objects;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -53,45 +55,31 @@ public class ImageFormatConfig implements FormatPluginConfig {
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((extensions == null) ? 0 : extensions.hashCode());
- result = prime * result + (fileSystemMetadata ? 1231 : 1237);
- result = prime * result + (descriptive ? 1231 : 1237);
- result = prime * result + ((timeZone == null) ? 0 : timeZone.hashCode());
- return result;
+ return Objects.hash(extensions, fileSystemMetadata, descriptive, timeZone);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
- } else if (obj == null) {
- return false;
- } else if (getClass() != obj.getClass()) {
- return false;
- }
- ImageFormatConfig other = (ImageFormatConfig) obj;
- if (extensions == null) {
- if (other.extensions != null) {
- return false;
- }
- } else if (!extensions.equals(other.extensions)) {
- return false;
- }
- if (fileSystemMetadata != other.fileSystemMetadata) {
- return false;
}
- if (descriptive != other.descriptive) {
+ if (obj == null || getClass() != obj.getClass()) {
return false;
}
- if (timeZone == null) {
- if (other.timeZone != null) {
- return false;
- }
- } else if (!timeZone.equals(other.timeZone)) {
- return false;
- }
- return true;
+ ImageFormatConfig other = (ImageFormatConfig) obj;
+ return Objects.equals(extensions, other.extensions) &&
+ Objects.equals(fileSystemMetadata, other.fileSystemMetadata) &&
+ Objects.equals(descriptive, other.descriptive) &&
+ Objects.equals(timeZone, other.timeZone);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .field("fileSystemMetadata", fileSystemMetadata)
+ .field("descriptive", descriptive)
+ .field("timeZone", timeZone)
+ .toString();
}
}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 2ff8415..c18d7bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -19,6 +19,10 @@ package org.apache.drill.exec.store.parquet;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -27,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class ParquetFormatConfig implements FormatPluginConfig {
public boolean autoCorrectCorruptDates = true;
- public boolean enableStringsSignedMinMax = false;
+ public boolean enableStringsSignedMinMax;
/**
* @return true if auto correction of corrupt dates is enabled, false otherwise
@@ -62,26 +66,20 @@ public class ParquetFormatConfig implements FormatPluginConfig {
}
ParquetFormatConfig that = (ParquetFormatConfig) o;
-
- if (autoCorrectCorruptDates != that.autoCorrectCorruptDates) {
- return false;
- }
-
- return enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+ return Objects.equals(autoCorrectCorruptDates, that.autoCorrectCorruptDates) &&
+ Objects.equals(enableStringsSignedMinMax, that.enableStringsSignedMinMax);
}
@Override
public int hashCode() {
- int result = (autoCorrectCorruptDates ? 1231 : 1237);
- result = 31 * result + (enableStringsSignedMinMax ? 1231 : 1237);
- return result;
+ return Objects.hash(autoCorrectCorruptDates, enableStringsSignedMinMax);
}
@Override
public String toString() {
- return "ParquetFormatConfig{"
- + "autoCorrectCorruptDates=" + autoCorrectCorruptDates
- + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
- + '}';
+ return new PlanStringBuilder(this)
+ .field("autoCorrectCorruptDates", autoCorrectCorruptDates)
+ .field("enableStringsSignedMinMax", enableStringsSignedMinMax)
+ .toString();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 5c0e2e4..7d3cecf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -88,8 +88,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
Preconditions.checkNotNull(formatConfig);
this.cacheFileRoot = cacheFileRoot;
- this.formatPlugin =
- Preconditions.checkNotNull((ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig));
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, formatConfig, ParquetFormatPlugin.class);
this.formatConfig = this.formatPlugin.getConfig();
DrillFileSystem fs =
ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 0bdc266..75cb302 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -59,7 +59,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
@JsonProperty("filter") LogicalExpression filter,
@JsonProperty("schema") TupleMetadata schema) throws ExecutionSetupException {
this(userName,
- (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
+ registry.resolveFormat(storageConfig, formatConfig, ParquetFormatPlugin.class),
rowGroupReadEntries,
columns,
readerConfig,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 2bad852..61bd44f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -26,6 +26,8 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.store.StorageStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -34,11 +36,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@JsonTypeName("parquet-writer")
public class ParquetWriter extends AbstractWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriter.class);
+ static final Logger logger = LoggerFactory.getLogger(ParquetWriter.class);
/** Version of Drill's Parquet writer. Increment this version (by 1) any time we make any format change to the file.
* Format changes include:
@@ -66,8 +67,7 @@ public class ParquetWriter extends AbstractWriter {
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
super(child);
- this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, new ParquetFormatConfig());
- Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+ this.formatPlugin = engineRegistry.resolveFormat(storageConfig, new ParquetFormatConfig(), ParquetFormatPlugin.class);
this.location = location;
this.partitionColumns = partitionColumns;
setStorageStrategy(storageStrategy);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
index 133b5d7..adf6379 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
@@ -19,31 +19,30 @@ package org.apache.drill.exec.store.pcap;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.base.Objects;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
@JsonTypeName(PcapFormatPlugin.PLUGIN_NAME)
public class PcapFormatConfig implements FormatPluginConfig {
- private static final List<String> DEFAULT_EXTS = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
-
- public List<String> extensions;
+ public List<String> extensions = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
- public boolean sessionizeTCPStreams = false;
+ public boolean sessionizeTCPStreams;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public List<String> getExtensions() {
- return extensions == null ? DEFAULT_EXTS : extensions;
+ return extensions;
}
@Override
public int hashCode() {
- return Arrays.hashCode(new Object[]{extensions, sessionizeTCPStreams});
+ return Objects.hash(extensions, sessionizeTCPStreams);
}
@Override
@@ -55,7 +54,15 @@ public class PcapFormatConfig implements FormatPluginConfig {
return false;
}
PcapFormatConfig other = (PcapFormatConfig) obj;
- return Objects.equal(extensions, other.extensions)
- && Objects.equal(sessionizeTCPStreams, other.sessionizeTCPStreams);
+ return Objects.equals(extensions, other.extensions) &&
+ Objects.equals(sessionizeTCPStreams, other.sessionizeTCPStreams);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .field("sessionizeTCPStreams", sessionizeTCPStreams)
+ .toString();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
index 7ff875a..8ded7ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.store.pcapng;
import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
import java.util.Collections;
@@ -49,4 +51,11 @@ public class PcapngFormatConfig implements FormatPluginConfig {
public int hashCode() {
return Objects.hash(extensions);
}
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 50d65a2..b2e37fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -45,13 +45,14 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
private final SystemTable table;
private final SystemTablePlugin plugin;
- private int maxRecordsToRead;
+ private final int maxRecordsToRead;
@JsonCreator
public SystemTableScan(@JsonProperty("table") SystemTable table,
@JsonProperty("maxRecordsToRead") int maxRecordsToRead,
@JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException {
- this(table, maxRecordsToRead, (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE));
+ this(table, maxRecordsToRead,
+ engineRegistry.resolve(SystemTablePluginConfig.INSTANCE, SystemTablePlugin.class));
}
public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index f8bd2c4..ebbd87d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -24,9 +24,9 @@ import java.util.Optional;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
@@ -64,8 +64,8 @@ public class StoragePluginTestUtils {
public static void updateSchemaLocation(final String pluginName,
final StoragePluginRegistry pluginRegistry,
final File tmpDirPath,
- String... schemas) throws ExecutionSetupException {
- final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
+ String... schemas) throws PluginException {
+ final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(pluginName);
Map<String, WorkspaceConfig> newWorkspaces = new HashMap<>();
Optional.ofNullable(pluginConfig.getWorkspaces())
@@ -91,13 +91,13 @@ public class StoragePluginTestUtils {
pluginRegistry.put(pluginName, newPluginConfig);
}
- public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry) throws PluginException {
configureFormatPlugins(pluginRegistry, CP_PLUGIN_NAME);
configureFormatPlugins(pluginRegistry, DFS_PLUGIN_NAME);
}
- public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry, String storagePlugin) throws ExecutionSetupException {
- FileSystemConfig fileSystemConfig = (FileSystemConfig) pluginRegistry.getConfig(storagePlugin);
+ public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry, String storagePlugin) throws PluginException {
+ FileSystemConfig fileSystemConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(storagePlugin);
Map<String, FormatPluginConfig> newFormats = new HashMap<>();
Optional.ofNullable(fileSystemConfig.getFormats())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index ce879cb..b9b1919 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -24,6 +24,7 @@ import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -406,7 +407,7 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
}
@AfterClass
- public static void removeMiniDfsBasedStorage() {
+ public static void removeMiniDfsBasedStorage() throws PluginException {
getDrillbitContext().getStorage().remove(MINI_DFS_STORAGE_PLUGIN_NAME);
stopMiniDfsCluster();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index b6af23f..03f52c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.impersonation;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.avro.AvroDataGenerator;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.categories.SecurityTest;
@@ -304,7 +305,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
}
@AfterClass
- public static void removeMiniDfsBasedStorage() {
+ public static void removeMiniDfsBasedStorage() throws PluginException {
getDrillbitContext().getStorage().remove(MINI_DFS_STORAGE_PLUGIN_NAME);
stopMiniDfsCluster();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
index d41bdf7..f4d6981 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/BasePluginRegistryTest.java
@@ -77,6 +77,9 @@ public class BasePluginRegistryTest extends BaseTest {
// for these tests.
@Override
public DrillbitContext drillbitContext() { return null; }
+
+ @Override
+ public ObjectMapper hoconMapper() { return mapper; }
}
public static class StoragePluginFixtureConfig extends StoragePluginConfig {
@@ -107,8 +110,8 @@ public class BasePluginRegistryTest extends BaseTest {
public int hashCode() {
return Objects.hash(mode);
}
-
}
+
@PrivatePlugin
public static class StoragePluginFixture extends AbstractStoragePlugin {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index 8da2016..d3c42e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.CP_PLUGIN_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -26,19 +27,28 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.BasePluginRegistryTest.StoragePluginFixture;
+import org.apache.drill.exec.store.BasePluginRegistryTest.StoragePluginFixtureConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.BaseTest;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.junit.After;
+import org.junit.ClassRule;
import org.junit.Test;
/**
@@ -47,142 +57,361 @@ import org.junit.Test;
* a Drillbit per tests to ensure each test works from a clean,
* known registry.
* <p>
+ * Tests are somewhat large because each needs a running
+ * Drillbit.
* This is several big tests because of the setup cost of
* starting the Drillbits in the needed config.
*/
-public class TestPluginRegistry extends BasePluginRegistryTest {
+public class TestPluginRegistry extends BaseTest {
+
+ @ClassRule
+ public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+ private static final String SYS_PLUGIN_NAME = "sys";
+ private static final String S3_PLUGIN_NAME = "s3";
+
+ // Mixed-case name used to verify that names are forced to lower case.
+ private static final String MY_PLUGIN_NAME = "myPlugin";
+
+ // Lower-case form after insertion into the registry.
+ private static final String MY_PLUGIN_KEY = MY_PLUGIN_NAME.toLowerCase();
@After
public void cleanup() throws Exception {
FileUtils.cleanDirectory(dirTestWatcher.getStoreDir());
}
+ private FileSystemConfig myConfig1() {
+ FileSystemConfig config = new FileSystemConfig("myConn",
+ new HashMap<>(), new HashMap<>(), new HashMap<>());
+ config.setEnabled(true);
+ return config;
+ }
+
+ private FileSystemConfig myConfig2() {
+ Map<String, String> props = new HashMap<>();
+ props.put("foo", "bar");
+ FileSystemConfig config = new FileSystemConfig("myConn",
+ props, new HashMap<>(), new HashMap<>());
+ config.setEnabled(true);
+ return config;
+ }
+
@Test
- public void testBasicLifecycle() throws Exception {
+ public void testLifecycle() throws Exception {
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
- try (ClusterFixture cluster = builder.build();) {
+ try (ClusterFixture cluster = builder.build()) {
StoragePluginRegistry registry = cluster.storageRegistry();
// Bootstrap file loaded.
- assertNotNull(registry.getPlugin(StoragePluginTestUtils.CP_PLUGIN_NAME)); // Normal
- assertNotNull(registry.getPlugin("sys")); // System
- assertNull(registry.getConfig("sys")); // Not editable
+ assertNotNull(registry.getPlugin(CP_PLUGIN_NAME)); // Normal
+ assertNotNull(registry.getPlugin(SYS_PLUGIN_NAME)); // System
+ assertNull(registry.getStoredConfig(SYS_PLUGIN_NAME)); // Not editable
assertNull(registry.getPlugin("bogus"));
// Enabled plugins
Map<String, StoragePluginConfig> configMap = registry.enabledConfigs();
- assertTrue(configMap.containsKey(StoragePluginTestUtils.CP_PLUGIN_NAME));
- assertFalse(configMap.containsKey("s3")); // Disabled, but still appears
- assertFalse(configMap.containsKey("sys"));
+ assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+ assertFalse(configMap.containsKey(S3_PLUGIN_NAME)); // Disabled, but still appears
+ assertFalse(configMap.containsKey(SYS_PLUGIN_NAME));
+
+ assertNotNull(registry.getDefinedConfig(CP_PLUGIN_NAME));
+ assertNull(registry.getDefinedConfig(S3_PLUGIN_NAME));
+ assertNotNull(registry.getDefinedConfig(SYS_PLUGIN_NAME));
// All stored plugins, including disabled
configMap = registry.storedConfigs();
- assertTrue(configMap.containsKey(StoragePluginTestUtils.CP_PLUGIN_NAME));
- assertTrue(configMap.containsKey("s3")); // Disabled, but still appears
- assertNotNull(configMap.get("s3"));
- assertSame(registry.getConfig("s3"), configMap.get("s3"));
- assertFalse(configMap.containsKey("sys"));
+ assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+ assertTrue(configMap.containsKey(S3_PLUGIN_NAME)); // Disabled, but still appears
+ assertNotNull(configMap.get(S3_PLUGIN_NAME));
+ assertSame(registry.getStoredConfig(S3_PLUGIN_NAME), configMap.get(S3_PLUGIN_NAME));
+ assertFalse(configMap.containsKey(SYS_PLUGIN_NAME));
int bootstrapCount = configMap.size();
+ // Enabled only
+ configMap = registry.storedConfigs(PluginFilter.ENABLED);
+ assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+ assertFalse(configMap.containsKey(S3_PLUGIN_NAME));
+
+ // Disabled only
+ configMap = registry.storedConfigs(PluginFilter.DISABLED);
+ assertFalse(configMap.containsKey(CP_PLUGIN_NAME));
+ assertTrue(configMap.containsKey(S3_PLUGIN_NAME));
+
// Create a new plugin
- FileSystemConfig pConfig1 = new FileSystemConfig("myConn",
- new HashMap<>(), new HashMap<>(), new HashMap<>());
- pConfig1.setEnabled(true);
- registry.put("myPlugin", pConfig1);
- StoragePlugin plugin1 = registry.getPlugin("myPlugin");
+ FileSystemConfig pConfig1 = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1);
+ StoragePlugin plugin1 = registry.getPlugin(MY_PLUGIN_NAME);
assertNotNull(plugin1);
- assertSame(plugin1, registry.getPlugin(pConfig1));
+ assertSame(plugin1, registry.getPluginByConfig(pConfig1));
configMap = registry.storedConfigs();
// Names converted to lowercase in persistent storage
- assertTrue(configMap.containsKey("myplugin"));
+ assertTrue(configMap.containsKey(MY_PLUGIN_KEY));
assertEquals(bootstrapCount + 1, configMap.size());
// Names are case-insensitive
- assertSame(plugin1, registry.getPlugin("myplugin"));
- assertSame(plugin1, registry.getPlugin("MYPLUGIN"));
+ assertSame(plugin1, registry.getPlugin(MY_PLUGIN_KEY));
+ assertSame(plugin1, registry.getPlugin(MY_PLUGIN_NAME.toUpperCase()));
// Update the plugin
- Map<String, String> props = new HashMap<>();
- props.put("foo", "bar");
- FileSystemConfig pConfig2 = new FileSystemConfig("myConn",
- props, new HashMap<>(), new HashMap<>());
+ FileSystemConfig pConfig2 = myConfig2();
+ registry.put(MY_PLUGIN_NAME, pConfig2);
+ StoragePlugin plugin2 = registry.getPlugin(MY_PLUGIN_NAME);
+ assertNotSame(plugin1, plugin2);
+ assertTrue(plugin2 instanceof FileSystemPlugin);
+ FileSystemPlugin fsStorage = (FileSystemPlugin) plugin2;
+ assertSame(pConfig2, fsStorage.getConfig());
+ assertSame(plugin2, registry.getPluginByConfig(pConfig2));
+
+ // Cannot create/update a plugin with null or blank name
+
+ FileSystemConfig pConfig3 = myConfig1();
+ try {
+ registry.put(null, pConfig3);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ try {
+ registry.put(" ", pConfig3);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ }
+ }
+
+ /**
+ * Tests the dedicated setEnabled() method to cleanly update the
+ * enabled status of a plugin by name.
+ */
+ @Test
+ public void testEnabledState() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
+
+ // Disable an enabled plugin
+ StoragePluginConfig config = registry.getStoredConfig(CP_PLUGIN_NAME);
+ assertTrue(config.isEnabled());
+
+ // Enable/disable has traditionally been done by changing
+ // the plugin outside of the registry, which leads to obvious
+ // race conditions.
+ // Tests synchronized version.
+ registry.setEnabled(CP_PLUGIN_NAME, true);
+ StoragePluginConfig savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+ assertEquals(config, savedConfig);
+ assertTrue(savedConfig.isEnabled());
+
+ registry.setEnabled(CP_PLUGIN_NAME, false);
+ savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+ assertEquals(config, savedConfig);
+ assertFalse(savedConfig.isEnabled());
+
+ // OK to disable twice
+ registry.setEnabled(CP_PLUGIN_NAME, false);
+ savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+ assertEquals(config, savedConfig);
+ assertFalse(savedConfig.isEnabled());
+
+ // Disabled plugins appear in the stored config map
+ Map<String, StoragePluginConfig> configMap = registry.storedConfigs();
+ assertTrue(configMap.containsKey(CP_PLUGIN_NAME));
+ assertEquals(config, configMap.get(CP_PLUGIN_NAME));
+
+ // Re-enable
+ registry.setEnabled(CP_PLUGIN_NAME, true);
+ savedConfig = registry.getStoredConfig(CP_PLUGIN_NAME);
+ assertEquals(config, savedConfig);
+ assertTrue(savedConfig.isEnabled());
+
+ // Error if plugin does not exist
+ try {
+ registry.setEnabled("foo", true);
+ fail();
+ } catch (PluginException e) {
+ // expected
+ }
+ try {
+ registry.setEnabled("foo", false);
+ fail();
+ } catch (PluginException e) {
+ // expected
+ }
+
+ // Error to mess with a system plugins
+ try {
+ registry.setEnabled(SYS_PLUGIN_NAME, true);
+ fail();
+ } catch (PluginException e) {
+ // expected
+ }
+ try {
+ registry.setEnabled(SYS_PLUGIN_NAME, false);
+ fail();
+ } catch (PluginException e) {
+ // expected
+ }
+ }
+ }
+
+ /**
+ * Tests the other way to enable/disabled plugins: make a **COPY** of the
+ * config and set the enable/disable status. Note: race conditions happen
+ * if a client modifies a stored config. Old code would do that, but the
+ * results are undefined. Either use setEnabled(), or make a copy of the
+ * config. This case also models where the user edits the config by hand
+ * in the Web Console to disable the plugin: the deserialize/serialize
+ * steps make the copy.
+ */
+ @Test
+ public void testEnableWithPut() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
+
+ FileSystemConfig pConfig1 = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1);
+ assertTrue(registry.getStoredConfig(MY_PLUGIN_NAME).isEnabled());
+
+ // Enable the same plugin using a different, but equal, config.
+ // The original config remains in place.
+ StoragePluginConfig pConfig2 = registry.copyConfig(MY_PLUGIN_NAME);
pConfig2.setEnabled(true);
- registry.put("myPlugin", pConfig2);
- StoragePlugin plugin2 = registry.getPlugin("myPlugin");
+ assertEquals(pConfig1, pConfig2);
+ registry.put(MY_PLUGIN_NAME, pConfig2);
+ StoragePluginConfig savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+ assertEquals(pConfig1, savedConfig);
+ assertTrue(savedConfig.isEnabled());
+
+ // Force resolution of the plugin so there is something to cache
+ StoragePlugin plugin = registry.getPlugin(MY_PLUGIN_NAME);
+ assertNotNull(plugin);
+
+ // Disable an enabled plugin. The old plugin lives in ephemeral
+ // store, but is not visible by name. If requested, the
+ // registry obtains a new copy from persistent storage.
+ StoragePluginConfig pConfig3 = registry.copyConfig(MY_PLUGIN_NAME);
+ pConfig3.setEnabled(false);
+ registry.put(MY_PLUGIN_NAME, pConfig3);
+ savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+ assertEquals(pConfig1, savedConfig);
+ assertFalse(savedConfig.isEnabled());
+
+ // OK to disable twice
+ StoragePluginConfig pConfig4 = registry.copyConfig(MY_PLUGIN_NAME);
+ pConfig4.setEnabled(false);
+ registry.put(MY_PLUGIN_NAME, pConfig4);
+ savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+ assertEquals(pConfig1, savedConfig);
+ assertFalse(savedConfig.isEnabled());
+
+ // Disabled plugins appear in the stored config map
+ Map<String, StoragePluginConfig> configMap = registry.storedConfigs();
+ assertTrue(configMap.containsKey(MY_PLUGIN_KEY));
+ assertEquals(pConfig3, configMap.get(MY_PLUGIN_KEY));
+
+ // Re-enable, the original plugin instance reappears.
+ StoragePluginConfig pConfig5 = registry.copyConfig(MY_PLUGIN_NAME);
+ pConfig5.setEnabled(true);
+ registry.put(MY_PLUGIN_NAME, pConfig5);
+ assertSame(plugin, registry.getPlugin(MY_PLUGIN_NAME));
+ assertTrue(plugin.getConfig().isEnabled());
+ }
+ }
+
+ /**
+ * Test the ephemeral cache where plugin instances live after their
+ * configs are changed or disabled so that any running queries see
+ * the prior plugin config an instance.
+ * <p>
+ * Note that each call to update a plugin must work with a copy of
+ * a config. Results are undefined if a client changes a stored
+ * config.
+ */
+ @Test
+ public void testEphemeralLifecycle() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
+
+ // Create a plugin
+ FileSystemConfig pConfig1 = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1);
+ StoragePlugin plugin1 = registry.getPlugin(MY_PLUGIN_NAME);
+
+ // Update the plugin
+ FileSystemConfig pConfig2 = myConfig2();
+ registry.put(MY_PLUGIN_NAME, pConfig2);
+ StoragePlugin plugin2 = registry.getPlugin(MY_PLUGIN_NAME);
assertNotSame(plugin1, plugin2);
assertTrue(plugin2 instanceof FileSystemPlugin);
FileSystemPlugin fsStorage = (FileSystemPlugin) plugin2;
assertSame(pConfig2, fsStorage.getConfig());
- assertSame(plugin2, registry.getPlugin(pConfig2));
+ assertSame(plugin2, registry.getPluginByConfig(pConfig2));
// Suppose a query was planned with plugin1 and now starts
// to execute. Plugin1 has been replaced with plugin2. However
// the registry moved the old plugin to ephemeral storage where
// it can still be found by configuration.
- StoragePlugin ePlugin1 = registry.getPlugin(pConfig1);
+ FileSystemConfig pConfig1a = myConfig1();
+ StoragePlugin ePlugin1 = registry.getPluginByConfig(pConfig1a);
assertSame(plugin1, ePlugin1);
assertNotSame(plugin2, ePlugin1);
- // Now, another thread does the same. It gets the same
- // ephemeral plugin.
- assertSame(plugin1, registry.getPlugin(pConfig1));
-
// Change the stored plugin back to the first config.
- registry.put("myPlugin", pConfig1);
+ FileSystemConfig pConfig1b = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1b);
// Now, lets suppose thread 3 starts to execute. It sees the original plugin
- assertSame(plugin1, registry.getPlugin("myPlugin"));
+ assertSame(plugin1, registry.getPlugin(MY_PLUGIN_NAME));
// But, the ephemeral plugin lives on. Go back to the second
// config.
- registry.put("myPlugin", pConfig2);
- assertSame(plugin2, registry.getPlugin("myPlugin"));
+ FileSystemConfig pConfig2b = myConfig2();
+ registry.put(MY_PLUGIN_NAME, pConfig2b);
+ assertSame(plugin2, registry.getPlugin(MY_PLUGIN_NAME));
// Thread 4, using the first config from planning in thread 3,
// still sees the first plugin.
- assertSame(plugin1, registry.getPlugin(pConfig1));
+ assertSame(plugin1, registry.getPluginByConfig(pConfig1b));
// Disable
- pConfig2.setEnabled(false);
- assertNull(registry.getPlugin("myPlugin"));
+ registry.setEnabled(MY_PLUGIN_NAME, false);
+ assertNull(registry.getPlugin(MY_PLUGIN_NAME));
// Though disabled, a running query will create an ephemeral
// plugin for the config.
- assertSame(plugin2, registry.getPlugin(pConfig2));
-
- // Disabling an ephemeral plugin neither makes sense
- // nor will have any effect.
- ePlugin1.getConfig().setEnabled(false);
- assertSame(ePlugin1, registry.getPlugin(pConfig1));
- assertTrue(registry.storedConfigs().containsKey("myplugin"));
- assertFalse(registry.enabledConfigs().containsKey("myplugin"));
+ assertSame(plugin1, registry.getPluginByConfig(pConfig1b));
+ assertSame(plugin2, registry.getPluginByConfig(pConfig2b));
- // Enable. The config is retrieved from the persistent store.
- // We notice the config is in the ephemeral store and
+ // Enable. We notice the config is in the ephemeral store and
// so we restore it.
- pConfig2.setEnabled(true);
- assertSame(plugin2, registry.getPlugin("myPlugin"));
- assertSame(plugin2, registry.getPlugin(pConfig2));
- assertTrue(registry.storedConfigs().containsKey("myplugin"));
- assertTrue(registry.enabledConfigs().containsKey("myplugin"));
+ registry.setEnabled(MY_PLUGIN_NAME, true);
+ assertSame(plugin2, registry.getPlugin(MY_PLUGIN_NAME));
+ assertSame(plugin2, registry.getPluginByConfig(pConfig2b));
+ assertTrue(registry.storedConfigs().containsKey(MY_PLUGIN_KEY));
+ assertTrue(registry.enabledConfigs().containsKey(MY_PLUGIN_KEY));
// Delete the plugin
- registry.remove("myPlugin");
- assertNull(registry.getPlugin("myPlugin"));
+ registry.remove(MY_PLUGIN_NAME);
+ assertNull(registry.getPlugin(MY_PLUGIN_NAME));
// Again a running query will retrieve the plugin from ephemeral storage
- assertSame(plugin1, registry.getPlugin(pConfig1));
- assertSame(plugin2, registry.getPlugin(pConfig2));
+ assertSame(plugin1, registry.getPluginByConfig(pConfig1));
+ assertSame(plugin2, registry.getPluginByConfig(pConfig2));
// Delete again, no-op
- registry.remove("myPlugin");
+ registry.remove(MY_PLUGIN_NAME);
// The retrieve-from-ephemeral does not kick in if we create
// a new plugin with the same config but a different name.
- pConfig1.setEnabled(true);
- registry.put("alias", pConfig1);
+ FileSystemConfig pConfig1c = myConfig1();
+ pConfig1c.setEnabled(true);
+ registry.put("alias", pConfig1c);
StoragePlugin plugin4 = registry.getPlugin("alias");
assertNotNull(plugin4);
assertNotSame(plugin1, plugin4);
@@ -192,39 +421,96 @@ public class TestPluginRegistry extends BasePluginRegistryTest {
// be returned on subsequent queries.
registry.remove("alias");
assertNull(registry.getPlugin("alias"));
- assertSame(plugin1, registry.getPlugin(pConfig1));
+ assertSame(plugin1, registry.getPluginByConfig(pConfig1));
// Try to change a system plugin
- StoragePlugin sysPlugin = registry.getPlugin("sys");
+ StoragePlugin sysPlugin = registry.getPlugin(SYS_PLUGIN_NAME);
assertNotNull(sysPlugin);
- FileSystemConfig pConfig3 = new FileSystemConfig("myConn",
- props, new HashMap<>(), new HashMap<>());
- pConfig3.setEnabled(true);
+ FileSystemConfig pConfig3 = myConfig2();
try {
- registry.put("sys", pConfig3);
+ registry.put(SYS_PLUGIN_NAME, pConfig3);
fail();
- } catch (UserException e) {
+ } catch (PluginException e) {
// Expected
}
pConfig3.setEnabled(false);
try {
- registry.put("sys", pConfig3);
+ registry.put(SYS_PLUGIN_NAME, pConfig3);
fail();
- } catch (UserException e) {
+ } catch (PluginException e) {
// Expected
}
- assertSame(sysPlugin, registry.getPlugin("sys"));
+ assertSame(sysPlugin, registry.getPlugin(SYS_PLUGIN_NAME));
// Try to delete a system plugin
try {
- registry.remove("sys");
+ registry.remove(SYS_PLUGIN_NAME);
fail();
- } catch (UserException e) {
+ } catch (PluginException e) {
// Expected
}
+ }
+ }
+
+ @Test
+ public void testEphemeralWithoutInstance() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
- // There is no protection for disabling a system plugin because
- // there is no code that will allow that at present.
+ // Create a plugin
+ // Since we've created no plugin instance, the configs come from
+ // the persistent store, there is no guarantee we get the same
+ // instance on each retrieval.
+ FileSystemConfig pConfig1 = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1);
+ StoragePluginConfig savedConfig = registry.getStoredConfig(MY_PLUGIN_NAME);
+ assertEquals(pConfig1, savedConfig);
+ assertTrue(savedConfig.isEnabled());
+ assertEquals(pConfig1, registry.getDefinedConfig(MY_PLUGIN_NAME));
+
+ // Do not refer to the instance. As a result, no reason to
+ // cache the plugin in ephemeral cache.
+
+ // Change config
+ FileSystemConfig pConfig1b = myConfig2();
+ registry.put(MY_PLUGIN_NAME, pConfig1b);
+ assertEquals(pConfig1b, registry.getDefinedConfig(MY_PLUGIN_NAME));
+
+ // Put it back
+ FileSystemConfig pConfig1c = myConfig1();
+ registry.put(MY_PLUGIN_NAME, pConfig1c);
+ assertEquals(pConfig1c, registry.getDefinedConfig(MY_PLUGIN_NAME));
+ assertEquals(pConfig1c, registry.getPlugin(MY_PLUGIN_NAME).getConfig());
+
+ // Odd case. Some thread refers to the config while not in
+ // the store which forces an instance which later reappears.
+ FileSystemConfig pConfig2a = myConfig1();
+ registry.put("myplugin2", pConfig2a);
+
+ // Didn't instantiate. Change
+ FileSystemConfig pConfig2b = myConfig2();
+ registry.put("myplugin2", pConfig2b);
+
+ // Force a resync
+ assertEquals(pConfig2b, registry.getDefinedConfig("myplugin2"));
+
+ // Refer by original config. We didn't cache the original config
+ // because there was no plugin instance. Must make up a new plugin
+ // Which goes into the ephemeral cache.
+ FileSystemConfig pConfig2c = myConfig1();
+ StoragePlugin plugin2 = registry.getPluginByConfig(pConfig2c);
+ assertEquals(pConfig2c, plugin2.getConfig());
+
+ // Put the original config into the persistent store and local cache.
+ // Should not dredge up the ephemeral version to reuse since that
+ // version had an unknown name.
+ // It is unfortunate that we have to instances with the same config,
+ // but different names. But, since the name is immutable, and not
+ // known above, the two-instance situation is the least bad option.
+ FileSystemConfig pConfig2d = myConfig1();
+ registry.put("myplugin2", pConfig2d);
+ assertEquals(pConfig2d, registry.getPlugin("myplugin2").getConfig());
}
}
@@ -240,43 +526,191 @@ public class TestPluginRegistry extends BasePluginRegistryTest {
.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH,
dirTestWatcher.getStoreDir().getAbsolutePath());
- try (ClusterFixture cluster = builder.build();) {
+ try (ClusterFixture cluster = builder.build()) {
StoragePluginRegistry registry1 = cluster.storageRegistry("bit1");
StoragePluginRegistry registry2 = cluster.storageRegistry("bit2");
// Define a plugin in Drillbit 1
- FileSystemConfig pConfig1 = new FileSystemConfig("myConn",
- new HashMap<>(), new HashMap<>(), new HashMap<>());
- pConfig1.setEnabled(true);
- registry1.put("myPlugin", pConfig1);
- StoragePlugin plugin1 = registry1.getPlugin("myPlugin");
+ FileSystemConfig pConfig1 = myConfig1();
+ registry1.put(MY_PLUGIN_NAME, pConfig1);
+ StoragePlugin plugin1 = registry1.getPlugin(MY_PLUGIN_NAME);
assertNotNull(plugin1);
// Should appear in Drillbit 2
- StoragePlugin plugin2 = registry2.getPlugin("myPlugin");
+ assertTrue(registry2.storedConfigs().containsKey(MY_PLUGIN_KEY));
+ StoragePlugin plugin2 = registry2.getPlugin(MY_PLUGIN_NAME);
assertNotNull(plugin2);
- assertEquals(pConfig1, plugin1.getConfig());
+ assertEquals(pConfig1, plugin2.getConfig());
// Change in Drillbit 1
- Map<String, String> props = new HashMap<>();
- props.put("foo", "bar");
- FileSystemConfig pConfig3 = new FileSystemConfig("myConn",
- props, new HashMap<>(), new HashMap<>());
- pConfig3.setEnabled(true);
- registry1.put("myPlugin", pConfig3);
- plugin1 = registry1.getPlugin("myPlugin");
- assertSame(pConfig3, plugin1.getConfig());
+ FileSystemConfig pConfig3 = myConfig2();
+ registry1.put(MY_PLUGIN_NAME, pConfig3);
+ plugin1 = registry1.getPlugin(MY_PLUGIN_NAME);
+ assertEquals(pConfig3, plugin1.getConfig());
// Change should appear in Drillbit 2
- plugin2 = registry2.getPlugin("myPlugin");
+ assertTrue(registry2.storedConfigs().containsValue(pConfig3));
+ plugin2 = registry2.getPlugin(MY_PLUGIN_NAME);
assertNotNull(plugin2);
assertEquals(pConfig3, plugin1.getConfig());
// Delete in Drillbit 2
- registry2.remove("myPlugin");
+ registry2.remove(MY_PLUGIN_NAME);
// Should not be available in Drillbit 1
- assertNull(registry1.getPlugin("myPlugin"));
+ assertFalse(registry1.storedConfigs().containsKey(MY_PLUGIN_KEY));
+ assertNull(registry1.getPlugin(MY_PLUGIN_NAME));
+ }
+ }
+
+ @Test
+ public void testFormatPlugin() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
+
+ StoragePluginConfig config = registry.getStoredConfig(CP_PLUGIN_NAME);
+ FileSystemConfig fsConfig = (FileSystemConfig) config;
+ assertFalse(fsConfig.getFormats().containsKey("bsv"));
+
+ // Add a new format
+ TextFormatConfig bsv = new TextFormatConfig();
+ bsv.fieldDelimiter = '!';
+ registry.putFormatPlugin(CP_PLUGIN_NAME, "bsv", bsv);
+
+ config = registry.getStoredConfig(CP_PLUGIN_NAME);
+ fsConfig = (FileSystemConfig) config;
+ assertTrue(fsConfig.getFormats().containsKey("bsv"));
+ assertSame(bsv, fsConfig.getFormats().get("bsv"));
+
+ // Remove the format
+ registry.putFormatPlugin(CP_PLUGIN_NAME, "bsv", null);
+ config = registry.getStoredConfig(CP_PLUGIN_NAME);
+ fsConfig = (FileSystemConfig) config;
+ assertFalse(fsConfig.getFormats().containsKey("bsv"));
+
+ // Undefined plugin
+ try {
+ registry.putFormatPlugin("bogus", "bsv", bsv);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ // Try to set a non-FS plugin
+ try {
+ registry.putFormatPlugin(SYS_PLUGIN_NAME, "bsv", bsv);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ }
+ }
+
+ /**
+ * Test to illustrate problems discussed in DRILL-7624
+ */
+ @Test
+ public void testBadPlugin() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ builder.configBuilder().put(ExecConstants.PRIVATE_CONNECTORS,
+ Collections.singletonList(StoragePluginFixture.class.getName()));
+ try (ClusterFixture cluster = builder.build()) {
+ StoragePluginRegistry registry = cluster.storageRegistry();
+
+ // Create a config that causes a crash because the plugin
+ // is not created on update.
+ StoragePluginFixtureConfig badConfig = new StoragePluginFixtureConfig("crash-ctor");
+ badConfig.setEnabled(true);
+
+ // Use the validated put to catch and reject errors at the cost of
+ // instantiating the plugin.
+ try {
+ registry.validatedPut("bad", badConfig);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ assertNull(registry.getStoredConfig("bad"));
+ assertFalse(registry.availablePlugins().contains("bad"));
+
+ // Try the same with JSON
+ String json = registry.encode(badConfig);
+ try {
+ registry.putJson("bad", json);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ assertFalse(registry.availablePlugins().contains("bad"));
+
+ // Now, lets pretend the plugin was valid when we did the above,
+ // but later the external system failed.
+ registry.put("bad", badConfig);
+ assertEquals(badConfig, registry.getStoredConfig("bad"));
+ assertTrue(registry.availablePlugins().contains("bad"));
+
+ // Ask for the actual plugin. Now will fail.
+ try {
+ registry.getPlugin("bad");
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("bad"));
+ }
+ assertTrue(registry.availablePlugins().contains("bad"));
+
+ // No plugin created. Will fail the next time also.
+ try {
+ registry.getPlugin("bad");
+ fail();
+ } catch (UserException e) {
+ // Expected
+ }
+ assertTrue(registry.availablePlugins().contains("bad"));
+
+ // The iterator used to find planning rules will skip the failed
+ // plugin. (That the planner uses all rules is, itself, a bug.)
+ int n = registry.availablePlugins().size();
+ int count = 0;
+ for (@SuppressWarnings("unused") Entry<String, StoragePlugin> entry : registry) {
+ count++;
+ }
+ assertEquals(n - 1, count);
+
+ // Reset to known good state
+ registry.remove("bad");
+
+ // Get tricky. Create a good plugin, then replace with
+ // a disabled bad one.
+ StoragePluginFixtureConfig goodConfig = new StoragePluginFixtureConfig("ok");
+ goodConfig.setEnabled(true);
+ json = registry.encode(goodConfig);
+ registry.putJson("test", json);
+ assertTrue(registry.availablePlugins().contains("test"));
+ assertEquals(goodConfig, registry.getPlugin("test").getConfig());
+
+ // Replace with a disabled bad plugin
+ badConfig = new StoragePluginFixtureConfig("crash-ctor");
+ badConfig.setEnabled(false);
+ json = registry.encode(badConfig);
+ registry.putJson("test", json);
+ assertFalse(registry.availablePlugins().contains("test"));
+ assertNull(registry.getPlugin("test"));
+ assertNotNull(registry.getStoredConfig("test"));
+ assertEquals(badConfig, registry.getStoredConfig("test"));
+
+ // Attempt to disable a disabled plugin. Should be OK.
+ registry.setEnabled("test", false);
+
+ // But, can't use this as a back door to enable an
+ // invalid plugin. (If the problem is due to an external
+ // system, fix that system first.)
+ try {
+ registry.setEnabled("test", true);
+ fail();
+ } catch (PluginException e) {
+ // Expected
+ }
+ assertFalse(registry.availablePlugins().contains("test"));
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
index 5413b7c..3ac0eb5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginsMap.java
@@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.store.PluginHandle.PluginType;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.test.OperatorFixture;
import org.junit.Test;
@@ -119,7 +120,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
@Test
- public void testBasics() {
+ public void testBasics() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginFixtureConfig config1a = new StoragePluginFixtureConfig("ok1");
@@ -168,12 +169,11 @@ public class TestPluginsMap extends BasePluginRegistryTest {
assertTrue(configs.contains(entry1.config()));
assertTrue(configs.contains(entry2.config()));
- // Convenience (but not optimistically locked) remove
- map.remove(entry1);
+ map.remove(entry1.name());
assertNull(map.get(entry1.name()));
assertNull(map.get(entry1.config()));
- map.remove(entry2);
+ map.remove(entry2.name());
assertTrue(map.getNames().isEmpty());
assertTrue(map.plugins().isEmpty());
@@ -183,7 +183,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
@Test
- public void testRemoveByName() {
+ public void testRemoveByName() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginMap map = new StoragePluginMap();
@@ -208,7 +208,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
@Test
- public void testSafePutRemove() {
+ public void testSafePutRemove() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginMap map = new StoragePluginMap();
@@ -232,10 +232,9 @@ public class TestPluginsMap extends BasePluginRegistryTest {
assertSame(entry3, map.putIfAbsent(entry4));
// Remove
- assertFalse(map.remove(entry1)); // Already replaced
- assertTrue(map.remove(entry2)); // currently in map
- assertTrue(map.remove(entry3));
- assertFalse(map.remove(entry4));
+ assertSame(entry2, map.remove(entry2.name())); // currently in map
+ assertSame(entry3, map.remove(entry3.name()));
+ assertNull(map.remove(entry4.name()));
assertTrue(map.getNames().isEmpty());
assertTrue(map.plugins().isEmpty());
@@ -245,7 +244,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
@Test
- public void testReplace() {
+ public void testReplace() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginMap map = new StoragePluginMap();
@@ -274,7 +273,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
@Test
- public void testSafeRemove() {
+ public void testSafeRemove() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginMap map = new StoragePluginMap();
@@ -349,9 +348,9 @@ public class TestPluginsMap extends BasePluginRegistryTest {
// Remove by entry fails for the same reasons as above.
try {
- map.remove(sysEntry);
+ map.remove(sysEntry.name());
fail();
- } catch (IllegalArgumentException e) {
+ } catch (PluginException e) {
// Expected
}
assertSame(sysEntry, map.get(sysPlugin.getName()));
@@ -375,7 +374,7 @@ public class TestPluginsMap extends BasePluginRegistryTest {
}
}
- public void testClose() {
+ public void testClose() throws PluginException {
ConnectorHandle connector = fixtureConnector();
StoragePluginMap map = new StoragePluginMap();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
index 8b95fbf..d6eb06b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
@@ -17,15 +17,10 @@
*/
package org.apache.drill.exec.store.httpd;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
@@ -48,25 +43,11 @@ public class TestHTTPDLogReader extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
- defineHTTPDPlugin();
- }
-
- private static void defineHTTPDPlugin() throws ExecutionSetupException {
- // Create an instance of the regex config.
- // Note: we can"t use the ".log" extension; the Drill .gitignore
- // file ignores such files, so they"ll never get committed. Instead,
- // make up a fake suffix.
+ // Define a temporary format plugin for the "cp" storage plugin.
HttpdLogFormatConfig sampleConfig = new HttpdLogFormatConfig();
sampleConfig.setLogFormat("%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"");
-
- // Define a temporary format plugin for the "cp" storage plugin.
- Drillbit drillbit = cluster.drillbit();
- final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
- final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
- pluginConfig.getFormats().put("sample", sampleConfig);
- pluginRegistry.put("cp", pluginConfig);
+ cluster.defineFormat("cp", "sample", sampleConfig);
}
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
index 3ea374c..0e47cf0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -20,19 +20,18 @@ package org.apache.drill.exec.store.log;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
@@ -79,6 +78,7 @@ public class TestLogReader extends ClusterTest {
// Note: we can't use the ".log" extension; the Drill .gitignore
// file ignores such files, so they'll never get committed. Instead,
// make up a fake suffix.
+ Map<String, FormatPluginConfig> formats = new HashMap<>();
LogFormatConfig sampleConfig = new LogFormatConfig();
sampleConfig.setExtension("log1");
sampleConfig.setRegex(DATE_ONLY_PATTERN);
@@ -87,6 +87,7 @@ public class TestLogReader extends ClusterTest {
sampleConfig.getSchema().add(new LogFormatField("year", "INT"));
sampleConfig.getSchema().add(new LogFormatField("month", "INT"));
sampleConfig.getSchema().add(new LogFormatField("day", "INT"));
+ formats.put("sample", sampleConfig);
// Full Drill log parser definition.
LogFormatConfig logConfig = new LogFormatConfig();
@@ -106,6 +107,7 @@ public class TestLogReader extends ClusterTest {
logConfig.getSchema().add(new LogFormatField("level"));
logConfig.getSchema().add(new LogFormatField("module"));
logConfig.getSchema().add(new LogFormatField("message"));
+ formats.put("drill-log", logConfig);
//Set up additional configs to check the time/date formats
LogFormatConfig logDateConfig = new LogFormatConfig();
@@ -120,10 +122,12 @@ public class TestLogReader extends ClusterTest {
logDateConfig.getSchema().add(new LogFormatField("message"));
logDateConfig.setMaxErrors(3);
+ formats.put("date-log",logDateConfig);
LogFormatConfig mysqlLogConfig = new LogFormatConfig();
mysqlLogConfig.setExtension("sqllog");
mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
+ formats.put("mysql-log", mysqlLogConfig);
// Firewall log file that requires date parsing
LogFormatConfig firewallConfig = new LogFormatConfig();
@@ -136,18 +140,10 @@ public class TestLogReader extends ClusterTest {
firewallConfig.getSchema().add(new LogFormatField("pid", "INT"));
firewallConfig.getSchema().add(new LogFormatField("message"));
firewallConfig.getSchema().add(new LogFormatField("src_ip"));
+ formats.put("ssdlog", firewallConfig);
// Define a temporary format plugin for the "cp" storage plugin.
- Drillbit drillbit = cluster.drillbit();
- final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
- final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
- pluginConfig.getFormats().put("sample", sampleConfig);
- pluginConfig.getFormats().put("drill-log", logConfig);
- pluginConfig.getFormats().put("date-log",logDateConfig);
- pluginConfig.getFormats().put("mysql-log", mysqlLogConfig);
- pluginConfig.getFormats().put("ssdlog", firewallConfig);
- pluginRegistry.put("cp", pluginConfig);
+ cluster.defineFormats("cp", formats);
// Config similar to the above, but with no type info. Types
// will be provided via the provided schema mechanism. Column names
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 5441670..bc6eff9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -19,11 +19,6 @@
package org.apache.drill.exec.store.pcap;
import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
@@ -39,19 +34,7 @@ public class TestPcapEVFReader extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
- definePlugin();
- }
-
- private static void definePlugin() throws ExecutionSetupException {
- PcapFormatConfig sampleConfig = new PcapFormatConfig();
-
- // Define a temporary plugin for the "cp" storage plugin.
- Drillbit drillbit = cluster.drillbit();
- final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
- final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
- pluginConfig.getFormats().put("sample", sampleConfig);
- pluginRegistry.put("cp", pluginConfig);
+ cluster.defineFormat("cp", "sample", new PcapFormatConfig());
}
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index b1f8aed..80e2a26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -38,7 +38,6 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.drill.common.config.DrillProperties;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
@@ -53,6 +52,7 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.StoragePluginRegistryImpl;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
@@ -467,7 +467,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
StoragePluginRegistryImpl registry = (StoragePluginRegistryImpl) drillbit.getContext().getStorage();
registry.put(name, config);
}
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new IllegalStateException("Plugin definition failed", e);
}
}
@@ -491,7 +491,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
for (Drillbit bit : drillbits()) {
try {
defineWorkspace(bit, pluginName, schemaName, path, defaultFormat, format);
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
// This functionality is supposed to work in tests. Change
// exception to unchecked to make test code simpler.
@@ -504,9 +504,9 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
private void defineWorkspace(Drillbit drillbit, String pluginName,
String schemaName, String path, String defaultFormat, FormatPluginConfig format)
- throws ExecutionSetupException {
+ throws PluginException {
final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
+ final FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getStoredConfig(pluginName);
final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(path, true, defaultFormat, false);
Map<String, WorkspaceConfig> newWorkspaces = new HashMap<>();
@@ -531,7 +531,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
for (Drillbit bit : drillbits()) {
try {
defineFormats(bit, pluginName, formats);
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new IllegalStateException(e);
}
}
@@ -539,10 +539,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
private void defineFormats(Drillbit drillbit,
String pluginName,
- Map<String, FormatPluginConfig> formats) throws ExecutionSetupException {
+ Map<String, FormatPluginConfig> formats) throws PluginException {
StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
- FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getConfig(pluginName);
- pluginConfig = pluginConfig.copyWithFormats(formats);
+ FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.copyConfig(pluginName);
+ pluginConfig.getFormats().putAll(formats);
pluginRegistry.put(pluginName, pluginConfig);
}
@@ -550,7 +550,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
String pluginName,
FileSystemConfig pluginConfig,
Map<String, WorkspaceConfig> newWorkspaces,
- Map<String, FormatPluginConfig> newFormats) throws ExecutionSetupException {
+ Map<String, FormatPluginConfig> newFormats) throws PluginException {
FileSystemConfig newPluginConfig = new FileSystemConfig(
pluginConfig.getConnection(),
pluginConfig.getConfig(),
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
index 9588ea9..f2d6ade 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java
@@ -17,9 +17,9 @@
*/
package org.apache.drill.test;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.mock.MockBreakageStorage;
import org.apache.drill.exec.store.mock.MockBreakageStorage.MockBreakageStorageEngineConfig;
@@ -44,7 +44,7 @@ public class ClusterMockStorageFixture extends ClusterFixture {
config.setEnabled(true);
pluginRegistry.put(name, config);
plugin = (MockBreakageStorage) pluginRegistry.getPlugin(name);
- } catch (ExecutionSetupException e) {
+ } catch (PluginException e) {
throw new IllegalStateException(e);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
index 029d259..9c83657 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
@@ -64,7 +64,7 @@ public class TestBuilder {
private Boolean ordered;
private boolean approximateEquality;
private double tolerance;
- private TestServices services;
+ private final TestServices services;
// Used to pass the type information associated with particular column names rather than relying on the
// ordering of the columns in the CSV file, or the default type inferences when reading JSON, this is used for the
// case where results of the test query are adding type casts to the baseline queries, this saves a little bit of
@@ -408,10 +408,6 @@ public class TestBuilder {
return this;
}
- private boolean singleExplicitBaselineRecord() {
- return baselineRecords != null;
- }
-
/**
* Provide a SQL query to validate against.
* @param baselineQuery
@@ -558,7 +554,7 @@ public class TestBuilder {
}
public class SchemaTestBuilder extends TestBuilder {
- private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
+ private final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
SchemaTestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
super(services, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
@@ -598,7 +594,7 @@ public class TestBuilder {
public class JSONTestBuilder extends TestBuilder {
// path to the baseline file that will be inserted into the validation query
- private String baselineFilePath;
+ private final String baselineFilePath;
JSONTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
@@ -627,8 +623,8 @@ public class TestBuilder {
/**
* Baseline query. Type of object depends on {@link #baselineQueryType}
*/
- private Object baselineQuery;
- private UserBitShared.QueryType baselineQueryType;
+ private final Object baselineQuery;
+ private final UserBitShared.QueryType baselineQueryType;
BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, TestServices services,
Object query, UserBitShared.QueryType queryType, Boolean ordered,
diff --git a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
index 2ac5b8a..91afaed 100644
--- a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
+++ b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -46,7 +47,10 @@ public class LogicalPlanPersistence {
}
public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult, ObjectMapper mapper) {
- this.mapper = mapper;
+ // The UI allows comments in JSON. Since we use the mapper
+ // here to serialize plugin configs to/from the pe
+ this.mapper = mapper
+ .configure(JsonParser.Feature.ALLOW_COMMENTS, true);
SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
.addDeserializer(LogicalExpression.class, new LogicalExpression.De(conf))
diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
index 970b58e..2548c1c 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java
@@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public abstract class StoragePluginConfig {
+
+ // DO NOT include enabled status in equality and hash
+ // comparisons; doing so will break the plugin registry.
private Boolean enabled;
/**