You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/22 00:21:38 UTC
[3/4] drill git commit: DRILL-2413: FileSystemPlugin refactoring:
avoid sharing DrillFileSystem across schemas
DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/117b7497
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/117b7497
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/117b7497
Branch: refs/heads/master
Commit: 117b749744d1775816fc5f9591dced3aae551352
Parents: fbb405b
Author: vkorukanti <ve...@gmail.com>
Authored: Tue Mar 10 10:19:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseSchemaFactory.java | 2 +-
.../exec/store/hbase/HBaseStoragePlugin.java | 2 +-
.../exec/store/hive/HiveStoragePlugin.java | 2 +-
.../store/hive/schema/HiveSchemaFactory.java | 3 +-
.../exec/store/mongo/MongoStoragePlugin.java | 2 +-
.../store/mongo/schema/MongoSchemaFactory.java | 4 +-
.../apache/drill/exec/ops/FragmentContext.java | 43 ++++++-
.../org/apache/drill/exec/ops/QueryContext.java | 15 ++-
.../apache/drill/exec/store/SchemaFactory.java | 4 +-
.../drill/exec/store/StoragePluginRegistry.java | 2 +-
.../drill/exec/store/avro/AvroFormatPlugin.java | 9 +-
.../exec/store/dfs/BasicFormatMatcher.java | 25 ++--
.../drill/exec/store/dfs/DrillFileSystem.java | 6 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 34 +++---
.../exec/store/dfs/FileSystemSchemaFactory.java | 12 +-
.../drill/exec/store/dfs/FormatCreator.java | 17 +--
.../drill/exec/store/dfs/FormatMatcher.java | 2 +-
.../drill/exec/store/dfs/FormatPlugin.java | 3 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 115 ++++++++++---------
.../exec/store/dfs/easy/EasyFormatPlugin.java | 27 +++--
.../exec/store/dfs/easy/EasyGroupScan.java | 8 +-
.../exec/store/easy/json/JSONFormatPlugin.java | 15 ++-
.../exec/store/easy/text/TextFormatPlugin.java | 18 ++-
.../store/ischema/InfoSchemaStoragePlugin.java | 2 +-
.../exec/store/mock/MockStorageEngine.java | 2 +-
.../exec/store/parquet/ParquetFormatPlugin.java | 76 ++++++------
.../exec/store/parquet/ParquetGroupScan.java | 17 +--
.../store/parquet/ParquetScanBatchCreator.java | 8 +-
.../drill/exec/store/sys/SystemTablePlugin.java | 2 +-
.../exec/store/text/DrillTextRecordReader.java | 6 +-
.../exec/work/batch/ControlHandlerImpl.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 11 +-
.../work/fragment/NonRootFragmentManager.java | 2 +-
.../exec/store/dfs/TestDrillFileSystem.java | 2 +-
34 files changed, 279 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7a0a64b..1c407e1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HBaseSchema schema = new HBaseSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index c10b0ab..948d462 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -66,7 +66,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index f4baf3b..91e7a92 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -82,7 +82,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 0e16e6f..587e90d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.hive.schema;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -186,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HiveSchema schema = new HiveSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index e46d8ec..dfad5ef 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -63,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 3c70638..f650ccc 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mongo.schema;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,7 +37,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.store.mongo.MongoCnxnManager;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
MongoSchema schema = new MongoSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9400355..7dfd0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
@@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
- private final UserClientConnection connection; // is null if attached to non-root fragment
+ private final UserClientConnection connection; // is null if this context is for non-root fragment
+ private final QueryContext queryContext; // is null if this context is for non-root fragment
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
@@ -87,10 +87,34 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
private final AccountingUserConnection accountingUserConnection;
+ /**
+ * Create a FragmentContext instance for non-root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+ final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, null, funcRegistry);
+ }
+
+ /**
+ * Create a FragmentContext instance for root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param queryContext QueryContext.
+ * @param connection UserClientConnection.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
+ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
this.context = dbContext;
+ this.queryContext = queryContext;
this.connection = connection;
this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
this.fragment = fragment;
@@ -128,6 +152,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
bufferManager = new BufferManager(this.allocator, this);
}
+ /**
+ * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+ * the long list of test files.
+ */
+ public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+ FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, connection, funcRegistry);
+ }
+
public OptionManager getOptions() {
return fragmentOptions;
}
@@ -162,15 +195,13 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
}
public SchemaPlus getRootSchema() {
- if (connection == null) {
+ if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment."));
return null;
}
- final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
- context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
- return root;
+ return queryContext.getRootSchema();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2dcac25..cd5c054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.ops;
+import java.io.IOException;
import java.util.Collection;
import io.netty.buffer.DrillBuf;
@@ -46,7 +47,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
public class QueryContext implements AutoCloseable, UdfUtilities {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private final DrillbitContext drillbitContext;
private final UserSession session;
@@ -113,9 +114,15 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
}
public SchemaPlus getRootSchema() {
- final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
- drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
- return rootSchema;
+ try {
+ final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+ drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+ return rootSchema;
+ } catch(IOException e) {
+ final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
}
public OptionManager getOptions() {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index feadabd..14d2fab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -21,8 +21,10 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.exec.rpc.user.UserSession;
+import java.io.IOException;
+
public interface SchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
- public void registerSchemas(UserSession session, SchemaPlus parent);
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 5d0eed6..cb9ee0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -301,7 +301,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
public class DrillSchemaFactory implements SchemaFactory {
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
Stopwatch watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 2f487d6..30c45fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;
@@ -40,13 +41,13 @@ import java.util.List;
*/
public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
- public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+ public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storagePluginConfig) {
- this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+ this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
}
- public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+ public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 9756f3c..3768aea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -36,34 +37,32 @@ import com.google.common.collect.Range;
public class BasicFormatMatcher extends FormatMatcher{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
- private final List<Pattern> patterns;
- private final MagicStringMatcher matcher;
- protected final DrillFileSystem fs;
protected final FormatPlugin plugin;
protected final boolean compressible;
protected final CompressionCodecFactory codecFactory;
- public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
+ private final List<Pattern> patterns;
+ private final MagicStringMatcher matcher;
+
+ public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
super();
this.patterns = ImmutableList.copyOf(patterns);
this.matcher = new MagicStringMatcher(magicStrings);
- this.fs = fs;
this.plugin = plugin;
this.compressible = false;
this.codecFactory = null;
}
- public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) {
+ public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
List<Pattern> patterns = Lists.newArrayList();
for (String extension : extensions) {
patterns.add(Pattern.compile(".*\\." + extension));
}
this.patterns = patterns;
this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
- this.fs = fs;
this.plugin = plugin;
this.compressible = compressible;
- this.codecFactory = new CompressionCodecFactory(fs.getConf());
+ this.codecFactory = new CompressionCodecFactory(fsConf);
}
@Override
@@ -72,8 +71,8 @@ public class BasicFormatMatcher extends FormatMatcher{
}
@Override
- public FormatSelection isReadable(FileSelection selection) throws IOException {
- if (isReadable(selection.getFirstPath(fs))) {
+ public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
+ if (isReadable(fs, selection.getFirstPath(fs))) {
if (plugin.getName() != null) {
NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
namedConfig.name = plugin.getName();
@@ -85,7 +84,7 @@ public class BasicFormatMatcher extends FormatMatcher{
return null;
}
- protected final boolean isReadable(FileStatus status) throws IOException {
+ protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException {
CompressionCodec codec = null;
if (compressible) {
codec = codecFactory.getCodec(status.getPath());
@@ -103,7 +102,7 @@ public class BasicFormatMatcher extends FormatMatcher{
}
}
- if (matcher.matches(status)) {
+ if (matcher.matches(fs, status)) {
return true;
}
return false;
@@ -128,7 +127,7 @@ public class BasicFormatMatcher extends FormatMatcher{
}
}
- public boolean matches(FileStatus status) throws IOException{
+ public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
if (ranges.isEmpty()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 2683cca..f8afe3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -96,11 +96,11 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
private final OperatorStats operatorStats;
public DrillFileSystem(Configuration fsConf) throws IOException {
- this(FileSystem.get(fsConf), null);
+ this(fsConf, null);
}
- public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
- this.underlyingFs = fs;
+ public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
+ this.underlyingFs = FileSystem.get(fsConf);
this.operatorStats = operatorStats;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index c5ca41b..775b402 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import static org.apache.drill.exec.store.dfs.FileSystemSchemaFactory.DEFAULT_WS_NAME;
+
/**
* A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
* LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem.
@@ -51,26 +53,26 @@ import com.google.common.collect.Maps;
* references to the FileSystem configuration and path management.
*/
public class FileSystemPlugin extends AbstractStoragePlugin{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
private final FileSystemSchemaFactory schemaFactory;
- private Map<String, FormatPlugin> formatPluginsByName;
- private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
- private FileSystemConfig config;
- private DrillbitContext context;
- private final DrillFileSystem fs;
+ private final Map<String, FormatPlugin> formatPluginsByName;
+ private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
+ private final FileSystemConfig config;
+ private final DrillbitContext context;
+ private final Configuration fsConf;
public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{
try {
this.config = config;
this.context = context;
- Configuration fsConf = new Configuration();
+ fsConf = new Configuration();
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
- fs = new DrillFileSystem(fsConf);
- formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
+
+ formatPluginsByName = FormatCreator.getFormatPlugins(context, fsConf, config);
List<FormatMatcher> matchers = Lists.newArrayList();
formatPluginsByConfig = Maps.newHashMap();
for (FormatPlugin p : formatPluginsByName.values()) {
@@ -78,17 +80,17 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
formatPluginsByConfig.put(p.getConfig(), p);
}
- boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
+ final boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
List<WorkspaceSchemaFactory> factories = Lists.newArrayList();
if (!noWorkspace) {
for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) {
- factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, fs, space.getValue(), matchers));
+ factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, space.getValue(), matchers));
}
}
// if the "default" workspace is not given add one.
- if (noWorkspace || !config.workspaces.containsKey("default")) {
- factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers));
+ if (noWorkspace || !config.workspaces.containsKey(DEFAULT_WS_NAME)) {
+ factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers));
}
this.schemaFactory = new FileSystemSchemaFactory(name, factories);
@@ -123,7 +125,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
@@ -151,5 +153,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
return setBuilder.build();
}
-
+ public Configuration getFsConf() {
+ return fsConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 44132d0..e11712e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.fs.Path;
* This is the top level schema that responds to root level path requests. Also supports
*/
public class FileSystemSchemaFactory implements SchemaFactory{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+
+ public static final String DEFAULT_WS_NAME = "default";
private List<WorkspaceSchemaFactory> factories;
private String schemaName;
- private final String defaultSchemaName = "default";
-
public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
super();
@@ -58,7 +58,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
FileSystemSchema schema = new FileSystemSchema(schemaName, session);
SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
schema.setPlus(plusOfThis);
@@ -69,14 +69,14 @@ public class FileSystemSchemaFactory implements SchemaFactory{
private final WorkspaceSchema defaultSchema;
private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
- public FileSystemSchema(String name, UserSession session) {
+ public FileSystemSchema(String name, UserSession session) throws IOException {
super(ImmutableList.<String>of(), name);
for(WorkspaceSchemaFactory f : factories){
WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
schemaMap.put(s.getName(), s);
}
- defaultSchema = schemaMap.get(defaultSchemaName);
+ defaultSchema = schemaMap.get(DEFAULT_WS_NAME);
}
void setPlus(SchemaPlus plusOfThis){
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index c164ed5..d2a903b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -31,20 +31,23 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.DrillbitContext;
import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
public class FormatCreator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
- static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
- static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
+ private static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+ Configuration.class, StoragePluginConfig.class, FormatPluginConfig.class);
+ private static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+ Configuration.class, StoragePluginConfig.class);
- static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) {
+ static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, Configuration fsConf,
+ FileSystemConfig storageConfig) {
final DrillConfig config = context.getConfig();
Map<String, FormatPlugin> plugins = Maps.newHashMap();
Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-
if (storageConfig.formats == null || storageConfig.formats.isEmpty()) {
for (Class<? extends FormatPlugin> pluginClass: pluginClasses) {
@@ -53,7 +56,7 @@ public class FormatCreator {
if (!DEFAULT_BASED.check(c)) {
continue;
}
- FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig);
+ FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fsConf, storageConfig);
plugins.put(plugin.getName(), plugin);
} catch (Exception e) {
logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
@@ -84,7 +87,7 @@ public class FormatCreator {
continue;
}
try {
- plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue()));
+ plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fsConf, storageConfig, e.getValue()));
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
index 92e3d0a..0b8c7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
@@ -23,6 +23,6 @@ public abstract class FormatMatcher {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);
public abstract boolean supportDirectoryReads();
- public abstract FormatSelection isReadable(FileSelection selection) throws IOException;
+ public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException;
public abstract FormatPlugin getFormatPlugin();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 58d5b42..955dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.hadoop.conf.Configuration;
/**
* Similar to a storage engine but built specifically to work within a FileSystem context.
@@ -51,7 +52,7 @@ public interface FormatPlugin {
public FormatPluginConfig getConfig();
public StoragePluginConfig getStorageConfig();
- public DrillFileSystem getFileSystem();
+ public Configuration getFsConf();
public DrillbitContext getContext();
public String getName();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 45e9129..a536350 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -52,14 +53,14 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+public class WorkspaceSchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
private final List<FormatMatcher> fileMatchers;
private final List<FormatMatcher> dirMatchers;
private final WorkspaceConfig config;
- private final DrillFileSystem fs;
+ private final Configuration fsConf;
private final DrillConfig drillConfig;
private final String storageEngineName;
private final String schemaName;
@@ -67,9 +68,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
private final ObjectMapper mapper;
public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, String schemaName,
- String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config,
- List<FormatMatcher> formatMatchers) throws ExecutionSetupException, IOException {
- this.fs = fileSystem;
+ String storageEngineName, WorkspaceConfig config, List<FormatMatcher> formatMatchers)
+ throws ExecutionSetupException, IOException {
+ this.fsConf = plugin.getFsConf();
this.plugin = plugin;
this.drillConfig = drillConfig;
this.config = config;
@@ -95,7 +96,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
defaultInputFormat, storageEngineName, schemaName);
throw new ExecutionSetupException(message);
}
- final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs,
+ final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
fileMatchers.add(fallbackMatcher);
}
@@ -105,54 +106,21 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
return DotDrillType.VIEW.getPath(config.getLocation(), name);
}
- public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) {
+ public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
return new WorkspaceSchema(parentSchemaPath, schemaName, session);
}
- @Override
- public DrillTable create(String key) {
- try {
+ public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+ private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
+ private final UserSession session;
+ private final DrillFileSystem fs;
- FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
- if (fileSelection == null) {
- return null;
- }
-
- if (fileSelection.containsDirectories(fs)) {
- for (FormatMatcher m : dirMatchers) {
- try {
- Object selection = m.isReadable(fileSelection);
- if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
- }
- } catch (IOException e) {
- logger.debug("File read failed.", e);
- }
- }
- fileSelection = fileSelection.minusDirectories(fs);
- }
-
- for (FormatMatcher m : fileMatchers) {
- Object selection = m.isReadable(fileSelection);
- if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
- }
- }
- return null;
-
- } catch (IOException e) {
- logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+ public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+ super(parentSchemaPath, wsName);
+ this.session = session;
+ this.fs = new DrillFileSystem(fsConf);
}
- return null;
- }
-
- @Override
- public void destroy(DrillTable value) {
- }
-
- public class WorkspaceSchema extends AbstractSchema {
-
public boolean createView(View view) throws Exception {
Path viewPath = getViewPath(view.getName());
boolean replaced = fs.exists(viewPath);
@@ -186,15 +154,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
fs.delete(getViewPath(viewName), false);
}
- private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(WorkspaceSchemaFactory.this);
-
- private UserSession session;
-
- public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
- super(parentSchemaPath, name);
- this.session = session;
- }
-
private Set<String> getViews() {
Set<String> viewSet = Sets.newHashSet();
// Look for files with ".view.drill" extension.
@@ -282,5 +241,47 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
public String getTypeName() {
return FileSystemConfig.NAME;
}
+
+ @Override
+ public DrillTable create(String key) {
+ try {
+
+ FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
+ if (fileSelection == null) {
+ return null;
+ }
+
+ if (fileSelection.containsDirectories(fs)) {
+ for (FormatMatcher m : dirMatchers) {
+ try {
+ Object selection = m.isReadable(fs, fileSelection);
+ if (selection != null) {
+ return new DynamicDrillTable(plugin, storageEngineName, selection);
+ }
+ } catch (IOException e) {
+ logger.debug("File read failed.", e);
+ }
+ }
+ fileSelection = fileSelection.minusDirectories(fs);
+ }
+
+ for (FormatMatcher m : fileMatchers) {
+ Object selection = m.isReadable(fs, fileSelection);
+ if (selection != null) {
+ return new DynamicDrillTable(plugin, storageEngineName, selection);
+ }
+ }
+ return null;
+
+ } catch (IOException e) {
+ logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void destroy(DrillTable value) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 6e1e0cc..5c7152a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -54,38 +54,39 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
private final BasicFormatMatcher matcher;
private final DrillbitContext context;
private final boolean readable;
private final boolean writable;
private final boolean blockSplittable;
- private final DrillFileSystem fs;
+ private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final FormatPluginConfig formatConfig;
private final String name;
protected final CompressionCodecFactory codecFactory;
private final boolean compressible;
- protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
- T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
+ protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName){
+ this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
this.readable = readable;
this.writable = writable;
this.context = context;
this.blockSplittable = blockSplittable;
this.compressible = compressible;
- this.fs = fs;
+ this.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
this.name = name == null ? defaultName : name;
- this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
+ this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
}
@Override
- public DrillFileSystem getFileSystem() {
- return fs;
+ public Configuration getFsConf() {
+ return fsConf;
}
@Override
@@ -152,7 +153,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
int numParts = 0;
OperatorContext oContext = new OperatorContext(scan, context,
false /* ScanBatch is not subject to fragment memory limit */);
- DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
+ DrillFileSystem dfs;
+ try {
+ dfs = new DrillFileSystem(fsConf, oContext.getStats());
+ } catch (IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+ }
+
for(FileWork work : scan.getWorkUnits()){
readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
if (scan.getSelectionRoot() != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 54cad56..7c70df3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
@@ -51,7 +52,7 @@ import com.google.common.collect.Lists;
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractFileGroupScan{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
@@ -109,9 +110,10 @@ public class EasyGroupScan extends AbstractFileGroupScan{
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
+ final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
this.selection = selection;
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+ BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
+ this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
this.maxWidth = chunks.size();
this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 2e65466..32e34b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
@@ -38,22 +39,26 @@ import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
private static final boolean IS_COMPRESSIBLE = true;
+ private static final String DEFAULT_NAME = "json";
+ private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
- public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
- this(name, context, fs, storageConfig, new JSONFormatConfig());
+ public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ this(name, context, fsConf, storageConfig, new JSONFormatConfig());
}
- public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json");
+ public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ JSONFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE,
+ DEFAULT_EXTS, DEFAULT_NAME);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bf46395..237589c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.store.easy.text;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -42,6 +42,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -51,13 +52,17 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
+ private final static String DEFAULT_NAME = "text";
- public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
- super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList<String>(), "text");
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
+ Collections.<String>emptyList(), DEFAULT_NAME);
}
- public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text");
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ TextFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
+ formatPluginConfig.getExtensions(), DEFAULT_NAME);
}
@@ -67,7 +72,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
Path path = dfs.makeQualified(new Path(fileWork.getPath()));
FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
- return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+ return new DrillTextRecordReader(split, getFsConf(), context,
+ ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 77c6b9a..a1249e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -69,7 +69,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
ISchema s = new ISchema(parent, this);
parent.add(s.getName(), s);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 51b2208..96226a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -55,7 +55,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e204a2c..9c83ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -62,37 +62,44 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class ParquetFormatPlugin implements FormatPlugin{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
- private final DrillbitContext context;
public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private CodecFactoryExposer codecFactoryExposer;
- private final DrillFileSystem fs;
+
+ private static final String DEFAULT_NAME = "parquet";
+
+ private static final List<Pattern> PATTERNS = Lists.newArrayList(
+ Pattern.compile(".*\\.parquet$"),
+ Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
+ private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+
+ private final DrillbitContext context;
+ private final CodecFactoryExposer codecFactoryExposer;
+ private final Configuration fsConf;
private final ParquetFormatMatcher formatMatcher;
private final ParquetFormatConfig config;
private final StoragePluginConfig storageConfig;
private final String name;
- public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
- this(name, context, fs, storageConfig, new ParquetFormatConfig());
+ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig){
+ this(name, context, fsConf, storageConfig, new ParquetFormatConfig());
}
- public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
this.context = context;
- this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
+ this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
this.config = formatConfig;
- this.formatMatcher = new ParquetFormatMatcher(this, fs);
+ this.formatMatcher = new ParquetFormatMatcher(this);
this.storageConfig = storageConfig;
- this.fs = fs;
- this.name = name == null ? "parquet" : name;
- }
-
- Configuration getHadoopConfig() {
- return fs.getConf();
+ this.fsConf = fsConf;
+ this.name = name == null ? DEFAULT_NAME : name;
}
- public DrillFileSystem getFileSystem() {
- return fs;
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
}
@Override
@@ -155,12 +162,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
@Override
public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+ return getGroupScan(selection, null);
}
@Override
public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+ final DrillFileSystem dfs = new DrillFileSystem(fsConf);
+ return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
}
@Override
@@ -190,20 +198,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
private static class ParquetFormatMatcher extends BasicFormatMatcher{
- private final DrillFileSystem fs;
-
- public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
- super(plugin, fs, //
- Lists.newArrayList( //
- Pattern.compile(".*\\.parquet$"), //
- Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
- //
- ),
- Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
-
- );
- this.fs = fs;
-
+ public ParquetFormatMatcher(ParquetFormatPlugin plugin) {
+ super(plugin, PATTERNS, MAGIC_STRINGS);
}
@Override
@@ -212,17 +208,17 @@ public class ParquetFormatPlugin implements FormatPlugin{
}
@Override
- public FormatSelection isReadable(FileSelection selection) throws IOException {
+ public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
// TODO: we only check the first file for directory reading. This is because
if(selection.containsDirectories(fs)){
- if(isDirReadable(selection.getFirstPath(fs))){
+ if(isDirReadable(fs, selection.getFirstPath(fs))){
return new FormatSelection(plugin.getConfig(), selection);
}
}
- return super.isReadable(selection);
+ return super.isReadable(fs, selection);
}
- boolean isDirReadable(FileStatus dir) {
+ boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
try {
if (fs.exists(p)) {
@@ -235,16 +231,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
if (files.length == 0) {
return false;
}
- return super.isReadable(files[0]);
+ return super.isReadable(fs, files[0]);
}
} catch (IOException e) {
logger.info("Failure while attempting to check for Parquet metadata file.", e);
return false;
}
}
-
-
-
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index acac61f..a59f2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
@@ -76,11 +77,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
- static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
- static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
- static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
-
- final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
private ListMultimap<Integer, RowGroupInfo> mappings;
private List<RowGroupInfo> rowGroupInfos;
@@ -135,7 +131,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Preconditions.checkNotNull(formatConfig);
this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(formatPlugin);
- this.fs = formatPlugin.getFileSystem();
+ this.fs = new DrillFileSystem(formatPlugin.getFsConf());
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.selectionRoot = selectionRoot;
@@ -155,7 +151,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.formatPlugin = formatPlugin;
this.columns = columns;
this.formatConfig = formatPlugin.getConfig();
- this.fs = formatPlugin.getFileSystem();
+ this.fs = new DrillFileSystem(formatPlugin.getFsConf());
this.entries = Lists.newArrayList();
for (FileStatus file : files) {
@@ -205,7 +201,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
ColumnChunkMetaData columnChunkMetaData;
- List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getHadoopConfig(), statuses, 16);
+ List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
for (Footer footer : footers) {
int index = 0;
ParquetMetadata metadata = footer.getParquetMetadata();
@@ -260,11 +256,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
}
- @JsonIgnore
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
@Override
public void modifyFileSelection(FileSelection selection) {
entries.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index c1f815e..b1c725c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.column.ColumnDescriptor;
@@ -95,7 +94,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
rowGroupScan.setOperatorId(id);
}
- DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
+ DrillFileSystem fs;
+ try {
+ fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+ } catch(IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+ }
Configuration conf = fs.getConf();
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index b92f98c..4a3b97b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -67,7 +67,7 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
parent.add(schema.getName(), schema);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 1ad053d..3368412 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.vector.RepeatedVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
@@ -69,7 +70,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
private FileSplit split;
private long totalRecordsRead;
- public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
+ public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
+ List<SchemaPath> columns) {
this.fragmentContext = context;
this.delimiter = (byte) delimiter;
this.split = split;
@@ -95,7 +97,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
TextInputFormat inputFormat = new TextInputFormat();
- JobConf job = new JobConf();
+ JobConf job = new JobConf(fsConf);
job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
job.setInputFormat(inputFormat.getClass());
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3fb0775..b6c6852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -124,7 +124,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
try {
// we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
if (fragment.getLeafFragment()) {
- final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment,
drillbitContext.getFunctionImplementationRegistry());
final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94ffba..cb2753c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -348,7 +348,7 @@ public class Foreman implements Runnable {
injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
// set up the root fragment first so we'll have incoming buffers available.
- setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
+ setupRootFragment(rootPlanFragment, work.getRootOperator());
setupNonRootFragments(planFragments);
drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
@@ -791,15 +791,14 @@ public class Foreman implements Runnable {
* Set up the root fragment (which will run locally), and submit it for execution.
*
* @param rootFragment
- * @param rootClient
* @param rootOperator
* @throws ExecutionSetupException
*/
- private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
- final FragmentRoot rootOperator) throws ExecutionSetupException {
+ private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
+ throws ExecutionSetupException {
@SuppressWarnings("resource")
- final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
- drillbitContext.getFunctionImplementationRegistry());
+ final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
+ initiatingClient, drillbitContext.getFunctionImplementationRegistry());
@SuppressWarnings("resource")
final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
rootContext.setBuffers(buffers);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index a5b928b..f526fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -56,7 +56,7 @@ public class NonRootFragmentManager implements FragmentManager {
try {
this.fragment = fragment;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
+ this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(root, this.context);
final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
fragment.getForeman()));
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index ab6639e..550f56f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -74,7 +74,7 @@ public class TestDrillFileSystem {
stats.startProcessing();
try {
- dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+ dfs = new DrillFileSystem(conf, stats);
is = dfs.open(new Path(tempFilePath));
byte[] buf = new byte[8000];