You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/22 00:21:36 UTC
[1/4] drill git commit: DRILL-2514: Add support for impersonation in
FileSystem storage plugin.
Repository: drill
Updated Branches:
refs/heads/master fbb405bdb -> 2a484251b
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index b032fce..58c8622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -49,12 +49,12 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
- return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS);
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
new file mode 100644
index 0000000..0297945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.server.options.OptionValue;
+
+/**
+ * Contains information needed by {@link org.apache.drill.exec.store.AbstractSchema} implementations.
+ */
+public class SchemaConfig {
+ private final String userName;
+ private final QueryContext queryContext;
+ private final boolean ignoreAuthErrors;
+
+ private SchemaConfig(final String userName, final QueryContext queryContext, final boolean ignoreAuthErrors) {
+ this.userName = userName;
+ this.queryContext = queryContext;
+ this.ignoreAuthErrors = ignoreAuthErrors;
+ }
+
+ public static Builder newBuilder(final String userName, final QueryContext queryContext) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(userName), "A valid userName is expected");
+ Preconditions.checkNotNull(queryContext, "Non-null QueryContext is expected");
+ return new Builder(userName, queryContext);
+ }
+
+ public static class Builder {
+ final String userName;
+ final QueryContext queryContext;
+ boolean ignoreAuthErrors;
+
+ private Builder(final String userName, final QueryContext queryContext) {
+ this.userName = userName;
+ this.queryContext = queryContext;
+ }
+
+ public Builder setIgnoreAuthErrors(boolean ignoreAuthErrors) {
+ this.ignoreAuthErrors = ignoreAuthErrors;
+ return this;
+ }
+
+ public SchemaConfig build() {
+ return new SchemaConfig(userName, queryContext, ignoreAuthErrors);
+ }
+ }
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
+ /**
+ * @return User whom to impersonate as while {@link net.hydromatic.optiq.SchemaPlus} instances
+ * interact with the underlying storage.
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * @return Should ignore if authorization errors are reported while {@link net.hydromatic.optiq.SchemaPlus}
+ * instances interact with the underlying storage.
+ */
+ public boolean getIgnoreAuthErrors() {
+ return ignoreAuthErrors;
+ }
+
+ public OptionValue getOption(String optionKey) {
+ return queryContext.getOptions().getOption(optionKey);
+ }
+
+ public ViewExpansionContext getViewExpansionContext() {
+ return queryContext.getViewExpansionContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index 14d2fab..e2dc613 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -19,12 +19,22 @@ package org.apache.drill.exec.store;
import net.hydromatic.optiq.SchemaPlus;
-import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.ops.QueryContext;
import java.io.IOException;
+/**
+ * StoragePlugins implements this interface to register the schemas they provide.
+ */
public interface SchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
+ /**
+ * Register the schemas provided by this SchemaFactory implementation under the given parent schema.
+ *
+ * @param schemaConfig Configuration for schema objects.
+ * @param parent Reference to parent schema.
+ * @throws IOException
+ */
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index ef5978c..b60c16f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -36,26 +36,24 @@ public interface StoragePlugin extends SchemaFactory {
/**
* Get the physical scan operator for the particular GroupScan (read) node.
*
- * @param selection
- * The configured storage engine specific selection.
+ * @param userName User whom to impersonate when when reading the contents as part of Scan.
+ * @param selection The configured storage engine specific selection.
* @return
* @throws IOException
*/
- public AbstractGroupScan getPhysicalScan(JSONOptions selection)
- throws IOException;
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
/**
* Get the physical scan operator for the particular GroupScan (read) node.
*
- * @param selection
- * The configured storage engine specific selection.
- * @param columns
- * (optional) The list of column names to scan from the data source.
+ * @param userName User whom to impersonate when when reading the contents as part of Scan.
+ * @param selection The configured storage engine specific selection.
+ * @param columns (optional) The list of column names to scan from the data source.
* @return
* @throws IOException
*/
- public AbstractGroupScan getPhysicalScan(JSONOptions selection,
- List<SchemaPath> columns) throws IOException;
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException;
public StoragePluginConfig getConfig();
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index cb9ee0f..bda4cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -42,6 +42,8 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -301,7 +303,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
public class DrillSchemaFactory implements SchemaFactory {
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
Stopwatch watch = new Stopwatch();
watch.start();
@@ -325,7 +327,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
// finally register schemas with the refreshed plugins
for (StoragePlugin plugin : plugins.values()) {
- plugin.registerSchemas(session, parent);
+ plugin.registerSchemas(schemaConfig, parent);
}
} catch (ExecutionSetupException e) {
throw new DrillRuntimeException("Failure while updating storage plugins", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 775b402..93fb0a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -29,12 +29,13 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.ClassPathFileSystem;
import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -110,7 +111,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException {
FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class);
FormatPlugin plugin;
if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) {
@@ -121,12 +123,12 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
if (plugin == null) {
throw new IOException(String.format("Failure getting requested format plugin named '%s'. It was not one of the format plugins registered.", formatSelection.getFormat()));
}
- return plugin.getGroupScan(formatSelection.getSelection(), columns);
+ return plugin.getGroupScan(userName, formatSelection.getSelection(), columns);
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
- schemaFactory.registerSchemas(session, parent);
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
}
public FormatPlugin getFormatPlugin(String name) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index e11712e..30d8d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -27,10 +27,11 @@ import net.hydromatic.optiq.Function;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.Table;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
@@ -58,8 +59,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
- FileSystemSchema schema = new FileSystemSchema(schemaName, session);
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig);
SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
schema.setPlus(plusOfThis);
}
@@ -69,10 +70,10 @@ public class FileSystemSchemaFactory implements SchemaFactory{
private final WorkspaceSchema defaultSchema;
private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
- public FileSystemSchema(String name, UserSession session) throws IOException {
+ public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
super(ImmutableList.<String>of(), name);
for(WorkspaceSchemaFactory f : factories){
- WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
+ WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig);
schemaMap.put(s.getName(), s);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 955dfeb..5668c54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -44,11 +44,9 @@ public interface FormatPlugin {
public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
- public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
-
public Set<StoragePluginOptimizerRule> getOptimizerRules();
- public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException;
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
public FormatPluginConfig getConfig();
public StoragePluginConfig getStorageConfig();
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index a536350..7cd50b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
@@ -30,6 +31,7 @@ import net.hydromatic.optiq.Table;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.dotdrill.DotDrillFile;
import org.apache.drill.exec.dotdrill.DotDrillType;
@@ -41,9 +43,10 @@ import org.apache.drill.exec.planner.logical.DrillViewTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -52,6 +55,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
public class WorkspaceSchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
@@ -106,25 +111,27 @@ public class WorkspaceSchemaFactory {
return DotDrillType.VIEW.getPath(config.getLocation(), name);
}
- public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
- return new WorkspaceSchema(parentSchemaPath, schemaName, session);
+ public WorkspaceSchema createSchema(List<String> parentSchemaPath, SchemaConfig schemaConfig) throws IOException {
+ return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig);
}
public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
- private final UserSession session;
+ private final SchemaConfig schemaConfig;
private final DrillFileSystem fs;
- public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+ public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig) throws IOException {
super(parentSchemaPath, wsName);
- this.session = session;
- this.fs = new DrillFileSystem(fsConf);
+ this.schemaConfig = schemaConfig;
+ this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
}
public boolean createView(View view) throws Exception {
Path viewPath = getViewPath(view.getName());
boolean replaced = fs.exists(viewPath);
- try (OutputStream stream = fs.create(viewPath)) {
+ final FsPermission viewPerms =
+ new FsPermission(schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+ try (OutputStream stream = DrillFileSystem.create(fs, viewPath, viewPerms)) {
mapper.writeValue(stream, view);
}
return replaced;
@@ -145,11 +152,6 @@ public class WorkspaceSchemaFactory {
return new SubDirectoryList(fileStatuses);
}
- public boolean viewExists(String viewName) throws Exception {
- Path viewPath = getViewPath(viewName);
- return fs.exists(viewPath);
- }
-
public void dropView(String viewName) throws IOException {
fs.delete(getViewPath(viewName), false);
}
@@ -165,6 +167,14 @@ public class WorkspaceSchemaFactory {
}
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
+ } catch (AccessControlException e) {
+ if (!schemaConfig.getIgnoreAuthErrors()) {
+ logger.debug(e.getMessage());
+ throw UserException
+ .permissionError(e)
+ .message("Not authorized to list view tables in schema [%s]", getFullSchemaName())
+ .build();
+ }
} catch (Exception e) {
logger.warn("Failure while trying to list .view.drill files in workspace [{}]", getFullSchemaName(), e);
}
@@ -177,7 +187,7 @@ public class WorkspaceSchemaFactory {
return Sets.union(tables.keySet(), getViews());
}
- private View getView(DotDrillFile f) throws Exception{
+ private View getView(DotDrillFile f) throws IOException{
assert f.getType() == DotDrillType.VIEW;
return f.getView(drillConfig);
}
@@ -190,19 +200,42 @@ public class WorkspaceSchemaFactory {
}
// then look for files that start with this name and end in .drill.
- List<DotDrillFile> files;
+ List<DotDrillFile> files = Collections.EMPTY_LIST;
try {
- files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+ try {
+ files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+ } catch(AccessControlException e) {
+ if (!schemaConfig.getIgnoreAuthErrors()) {
+ logger.debug(e.getMessage());
+ throw UserException
+ .permissionError(e)
+ .message("Not authorized to list or query tables in schema [%s]", getFullSchemaName())
+ .build();
+ }
+ } catch(IOException e) {
+ logger.warn("Failure while trying to list view tables in workspace [{}]", name, getFullSchemaName(), e);
+ }
+
for(DotDrillFile f : files) {
switch(f.getType()) {
case VIEW:
- return new DrillViewTable(schemaPath, getView(f));
+ try {
+ return new DrillViewTable(getView(f), f.getOwner(), schemaConfig.getViewExpansionContext());
+ } catch (AccessControlException e) {
+ if (!schemaConfig.getIgnoreAuthErrors()) {
+ logger.debug(e.getMessage());
+ throw UserException
+ .permissionError(e)
+ .message("Not authorized to read view [%s] in schema [%s]", name, getFullSchemaName())
+ .build();
+ }
+ } catch (IOException e) {
+ logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
+ }
}
}
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
- } catch (Exception e) {
- logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
}
return tables.get(name);
@@ -223,7 +256,7 @@ public class WorkspaceSchemaFactory {
@Override
public CreateTableEntry createNewTable(String tableName) {
- String storage = session.getOptions().getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
+ String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
if (formatPlugin == null) {
throw new UnsupportedOperationException(
@@ -256,7 +289,7 @@ public class WorkspaceSchemaFactory {
try {
Object selection = m.isReadable(fs, fileSelection);
if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
+ return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
}
} catch (IOException e) {
logger.debug("File read failed.", e);
@@ -268,11 +301,19 @@ public class WorkspaceSchemaFactory {
for (FormatMatcher m : fileMatchers) {
Object selection = m.isReadable(fs, fileSelection);
if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
+ return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
}
}
return null;
+ } catch (AccessControlException e) {
+ if (!schemaConfig.getIgnoreAuthErrors()) {
+ logger.debug(e.getMessage());
+ throw UserException
+ .permissionError(e)
+ .message("Not authorized to read table [%s] in schema [%s]", key, getFullSchemaName())
+ .build();
+ }
} catch (IOException e) {
logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 5c7152a..f1271b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -147,7 +148,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
newColumns.add(AbstractRecordReader.STAR_COLUMN);
}
// Create a new sub scan object with the new set of columns;
- scan = new EasySubScan(scan.getWorkUnits(), scan.getFormatPlugin(), newColumns, scan.getSelectionRoot());
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), newColumns,
+ scan.getSelectionRoot());
}
int numParts = 0;
@@ -203,13 +205,9 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
@Override
- public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new EasyGroupScan(selection, this, selection.selectionRoot);
- }
-
- @Override
- public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+ throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 7c70df3..1b333ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractFileGroupScan{
@@ -66,6 +67,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@JsonCreator
public EasyGroupScan(
+ @JsonProperty("userName") String userName,
@JsonProperty("files") List<String> files, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -73,23 +75,26 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
- this(new FileSelection(files, true),
+ this(ImpersonationUtil.resolveUserName(userName),
+ new FileSelection(files, true),
(EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
columns,
selectionRoot);
}
- public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+ public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
throws IOException {
- this(selection, formatPlugin, ALL_COLUMNS, selectionRoot);
+ this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
}
public EasyGroupScan(
+ String userName,
FileSelection selection, //
EasyFormatPlugin<?> formatPlugin, //
List<SchemaPath> columns,
String selectionRoot
) throws IOException{
+ super(userName);
this.selection = Preconditions.checkNotNull(selection);
this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
@@ -98,7 +103,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
}
private EasyGroupScan(EasyGroupScan that) {
- Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+ super(that.getUserName());
selection = that.selection;
formatPlugin = that.formatPlugin;
columns = that.columns;
@@ -110,7 +115,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
- final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
+ final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.selection = selection;
BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
@@ -203,7 +208,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot);
+ return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
}
private List<FileWorkImpl> convert(List<CompleteFileWork> list) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index e78ba0b..5fd5039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -47,6 +47,7 @@ public class EasySubScan extends AbstractSubScan{
@JsonCreator
public EasySubScan(
+ @JsonProperty("userName") String userName,
@JsonProperty("files") List<FileWorkImpl> files, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -54,7 +55,7 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("columns") List<SchemaPath> columns, //
@JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
-
+ super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(this.formatPlugin);
this.files = files;
@@ -62,7 +63,9 @@ public class EasySubScan extends AbstractSubScan{
this.selectionRoot = selectionRoot;
}
- public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
+ public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
+ String selectionRoot){
+ super(userName);
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 262c6be..e08fe71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -38,7 +38,7 @@ public class DirectGroupScan extends AbstractGroupScan{
private final RecordReader reader;
public DirectGroupScan(RecordReader reader) {
- super();
+ super((String)null);
this.reader = reader;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 89694f8..763ecba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -27,7 +27,7 @@ public class DirectSubScan extends AbstractSubScan{
private final RecordReader reader;
public DirectSubScan(RecordReader reader) {
- super();
+ super(null);
this.reader = reader;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 237589c..722650d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -77,8 +77,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
}
@Override
- public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project?
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+ throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 22cc483..bd0d582 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -54,11 +54,13 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
@JsonCreator
public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table,
@JsonProperty("filter") InfoSchemaFilter filter) {
+ super((String)null);
this.table = table;
this.filter = filter;
}
private InfoSchemaGroupScan(InfoSchemaGroupScan that) {
+ super(that);
this.table = that.table;
this.filter = that.filter;
this.isFilterPushedDown = that.isFilterPushedDown;
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index a1249e6..4dfde7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -29,7 +29,6 @@ import net.hydromatic.optiq.Table;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -37,6 +36,7 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements InfoSchemaConstants {
@@ -58,7 +58,8 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
}
@Override
- public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException {
SelectedTable table = selection.getWith(context.getConfig(), SelectedTable.class);
return new InfoSchemaGroupScan(table);
}
@@ -69,7 +70,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
ISchema s = new ISchema(parent, this);
parent.add(s.getName(), s);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index b9349b0..7a479d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -32,6 +32,7 @@ public class InfoSchemaSubScan extends AbstractSubScan{
@JsonCreator
public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table,
@JsonProperty("filter") InfoSchemaFilter filter) {
+ super(null);
this.table = table;
this.filter = filter;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
index 947998d..5b132c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
@@ -116,8 +116,14 @@ public abstract class RecordGenerator implements InfoSchemaConstants {
// ... do for each of the schema's tables.
for (String tableName: schema.getTableNames()) {
Table table = schema.getTable(tableName);
- // Visit the table, and if requested ...
+ if (table == null) {
+ // Schema may return NULL for table if the query user doesn't have permissions to load the table. Ignore such
+ // tables as INFO SCHEMA is about showing tables which the use has access to query.
+ continue;
+ }
+
+ // Visit the table, and if requested ...
if (shouldVisitTable(schemaPath, tableName) && visitTable(schemaPath, tableName, table)) {
// ... do for each of the table's fields.
RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl());
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index dc90a33..bb71c31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -53,8 +53,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
@JsonCreator
public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+ super((String)null);
this.readEntries = readEntries;
-
this.url = url;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 96226a1..1689300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -27,9 +27,9 @@ import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -45,7 +45,8 @@ public class MockStorageEngine extends AbstractStoragePlugin {
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException {
ArrayList<MockScanEntry> readEntries = selection.getListWith(new ObjectMapper(),
new TypeReference<ArrayList<MockScanEntry>>() {
@@ -55,7 +56,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9c83ea0..7298f53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -161,14 +161,9 @@ public class ParquetFormatPlugin implements FormatPlugin{
}
@Override
- public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
- return getGroupScan(selection, null);
- }
-
- @Override
- public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- final DrillFileSystem dfs = new DrillFileSystem(fsConf);
- return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
+ public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+ throws IOException {
+ return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a59f2c9..21b9b48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,12 +18,14 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
@@ -50,17 +52,17 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import parquet.hadoop.Footer;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
-import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -78,17 +80,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
- private ListMultimap<Integer, RowGroupInfo> mappings;
- private List<RowGroupInfo> rowGroupInfos;
private final List<ReadEntryWithPath> entries;
private final Stopwatch watch = new Stopwatch();
private final ParquetFormatPlugin formatPlugin;
private final ParquetFormatConfig formatConfig;
- private final FileSystem fs;
- private List<EndpointAffinity> endpointAffinities;
- private String selectionRoot;
+ private final DrillFileSystem fs;
+ private final String selectionRoot;
+ private List<EndpointAffinity> endpointAffinities;
private List<SchemaPath> columns;
+ private ListMultimap<Integer, RowGroupInfo> mappings;
+ private List<RowGroupInfo> rowGroupInfos;
/*
* total number of rows (obtained from parquet footer)
@@ -100,22 +102,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
*/
private Map<SchemaPath, Long> columnValueCounts;
- public List<ReadEntryWithPath> getEntries() {
- return entries;
- }
-
- @JsonProperty("format")
- public ParquetFormatConfig getFormatConfig() {
- return this.formatConfig;
- }
-
- @JsonProperty("storage")
- public StoragePluginConfig getEngineConfig() {
- return this.formatPlugin.getStorageConfig();
- }
-
@JsonCreator
public ParquetGroupScan( //
+ @JsonProperty("userName") String userName,
@JsonProperty("entries") List<ReadEntryWithPath> entries, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -123,6 +112,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
@JsonProperty("columns") List<SchemaPath> columns, //
@JsonProperty("selectionRoot") String selectionRoot //
) throws IOException, ExecutionSetupException {
+ super(ImpersonationUtil.resolveUserName(userName));
this.columns = columns;
if (formatConfig == null) {
formatConfig = new ParquetFormatConfig();
@@ -131,29 +121,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Preconditions.checkNotNull(formatConfig);
this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(formatPlugin);
- this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+ this.fs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.selectionRoot = selectionRoot;
this.readFooterFromEntries();
-
- }
-
- public String getSelectionRoot() {
- return selectionRoot;
}
- public ParquetGroupScan(List<FileStatus> files, //
+ public ParquetGroupScan( //
+ String userName,
+ FileSelection selection, //
ParquetFormatPlugin formatPlugin, //
String selectionRoot,
List<SchemaPath> columns) //
throws IOException {
+ super(userName);
this.formatPlugin = formatPlugin;
this.columns = columns;
this.formatConfig = formatPlugin.getConfig();
- this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+ this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf());
this.entries = Lists.newArrayList();
+ List<FileStatus> files = selection.getFileStatusList(fs);
for (FileStatus file : files) {
entries.add(new ReadEntryWithPath(file.getPath().toString()));
}
@@ -167,6 +156,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
* This is used to clone another copy of the group scan.
*/
private ParquetGroupScan(ParquetGroupScan that) {
+ super(that);
this.columns = that.columns;
this.endpointAffinities = that.endpointAffinities;
this.entries = that.entries;
@@ -180,6 +170,25 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.columnValueCounts = that.columnValueCounts;
}
+
+ public List<ReadEntryWithPath> getEntries() {
+ return entries;
+ }
+
+ @JsonProperty("format")
+ public ParquetFormatConfig getFormatConfig() {
+ return this.formatConfig;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getEngineConfig() {
+ return this.formatPlugin.getStorageConfig();
+ }
+
+ public String getSelectionRoot() {
+ return selectionRoot;
+ }
+
private void readFooterFromEntries() throws IOException {
List<FileStatus> files = Lists.newArrayList();
for (ReadEntryWithPath e : entries) {
@@ -188,12 +197,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
readFooter(files);
}
- private void readFooter(List<FileStatus> statuses) throws IOException {
+ private void readFooter(final List<FileStatus> statuses) {
+ final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ readFooterHelper(statuses);
+ return null;
+ }
+ });
+ } catch (InterruptedException | IOException e) {
+ final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage());
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
+ }
+
+ private void readFooterHelper(List<FileStatus> statuses) throws IOException {
watch.reset();
watch.start();
Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
-
rowGroupInfos = Lists.newArrayList();
long start = 0, length = 0;
rowCount = 0;
@@ -373,7 +397,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
+ return new ParquetRowGroupScan(
+ getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
}
private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index fd40f41..987f792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -55,22 +55,26 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@JsonCreator
public ParquetRowGroupScan( //
@JacksonInject StoragePluginRegistry registry, //
+ @JsonProperty("userName") String userName, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
@JsonProperty("columns") List<SchemaPath> columns, //
@JsonProperty("selectionRoot") String selectionRoot //
) throws ExecutionSetupException {
- this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+ this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
formatConfig == null ? new ParquetFormatConfig() : formatConfig),
rowGroupReadEntries, columns, selectionRoot);
}
public ParquetRowGroupScan( //
+ String userName, //
ParquetFormatPlugin formatPlugin, //
List<RowGroupReadEntry> rowGroupReadEntries, //
- List<SchemaPath> columns,
- String selectionRoot) {
+ List<SchemaPath> columns, //
+ String selectionRoot //
+ ) {
+ super(userName);
this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
this.formatConfig = formatPlugin.getConfig();
this.rowGroupReadEntries = rowGroupReadEntries;
@@ -110,7 +114,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns, selectionRoot);
+ return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index b1c725c..52dccd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -90,7 +91,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
}
final int id = rowGroupScan.getOperatorId();
// Create the new row group scan with the new columns
- rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getStorageEngine(), rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
+ rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
+ rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
rowGroupScan.setOperatorId(id);
}
@@ -100,7 +102,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
} catch(IOException e) {
throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
}
- Configuration conf = fs.getConf();
+ Configuration conf = new Configuration(fs.getConf());
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
@@ -119,7 +121,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
try {
if ( ! footers.containsKey(e.getPath())){
footers.put(e.getPath(),
- ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
+ ParquetFileReader.readFooter(conf, new Path(e.getPath())));
}
if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
readers.add(
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 4a3b97b..a13c945 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -31,11 +31,11 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.apache.drill.exec.store.SchemaConfig;
/**
* A "storage" plugin for system tables.
@@ -67,12 +67,13 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
parent.add(schema.getName(), schema);
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+ throws IOException {
SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
return new SystemTableScan(table, this);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index f8baf97..22bd7df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -54,11 +54,13 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
@JsonProperty("table") SystemTable table, //
@JacksonInject StoragePluginRegistry engineRegistry //
) throws IOException, ExecutionSetupException {
+ super((String)null);
this.table = table;
this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
}
public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
+ super((String)null);
this.table = table;
this.plugin = plugin;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
new file mode 100644
index 0000000..9997178
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.google.common.base.Strings;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Utilities for impersonation purpose.
+ */
+public class ImpersonationUtil {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImpersonationUtil.class);
+
+ /**
+ * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} of operator owner if operator
+ * owner is valid. Otherwise create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for
+ * query user.
+ *
+ * @param opUserName Name of the user whom to impersonate while setting up the operator.
+ * @param queryUserName Name of the user who issues the query. If <i>opUserName</i> is invalid,
+ * then this parameter must be valid user name.
+ * @return
+ */
+ public static UserGroupInformation createProxyUgi(String opUserName, String queryUserName) {
+ if (!Strings.isNullOrEmpty(opUserName)) {
+ return createProxyUgi(opUserName);
+ }
+
+ if (Strings.isNullOrEmpty(queryUserName)) {
+ // TODO(DRILL-2097): Tests that use SimpleRootExec have don't assign any query user name in FragmentContext.
+ // Disable throwing exception to modifying the long list of test files.
+ // throw new DrillRuntimeException("Invalid value for query user name");
+ return getProcessUserUGI();
+ }
+
+ return createProxyUgi(queryUserName);
+ }
+
+ /**
+ * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for give user name.
+ *
+ * TODO: we may want to cache the {@link org.apache.hadoop.security.UserGroupInformation} instances as we try to
+ * create different instances for the same user which is an unnecessary overhead.
+ *
+ * @param proxyUserName Proxy user name (must be valid)
+ * @return
+ */
+ public static UserGroupInformation createProxyUgi(String proxyUserName) {
+ try {
+ if (Strings.isNullOrEmpty(proxyUserName)) {
+ throw new DrillRuntimeException("Invalid value for proxy user name");
+ }
+
+ return UserGroupInformation.createProxyUser(proxyUserName, UserGroupInformation.getLoginUser());
+ } catch(IOException e) {
+ final String errMsg = "Failed to create proxy user UserGroupInformation object: " + e.getMessage();
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
+ }
+
+ /**
+ * If the given user name is empty, return the current process user name. This is a temporary change to avoid
+ * modifying long list of tests files which have GroupScan operator with no user name property.
+ * @param userName User name found in GroupScan POP definition.
+ */
+ public static String resolveUserName(String userName) {
+ if (!Strings.isNullOrEmpty(userName)) {
+ return userName;
+ }
+ return getProcessUserName();
+ }
+
+ /**
+ * Return the name of the user who is running the Drillbit.
+ *
+ * @return Drillbit process user.
+ */
+ public static String getProcessUserName() {
+ return getProcessUserUGI().getUserName();
+ }
+
+ /**
+ * Return the {@link org.apache.hadoop.security.UserGroupInformation} of user who is running the Drillbit.
+ *
+ * @return Drillbit process user {@link org.apache.hadoop.security.UserGroupInformation}.
+ */
+ public static UserGroupInformation getProcessUserUGI() {
+ try {
+ return UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ final String errMsg = "Failed to get process user UserGroupInformation object.";
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
+ }
+
+ /**
+ * Create DrillFileSystem for given <i>proxyUserName</i> and configuration.
+ *
+ * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+ * @param fsConf FileSystem configuration.
+ * @return
+ */
+ public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf) {
+ return createFileSystem(proxyUserName, fsConf, null);
+ }
+
+ /**
+ * Create DrillFileSystem for given <i>proxyUserName</i>, configuration and stats.
+ *
+ * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+ * @param fsConf FileSystem configuration.
+ * @param stats OperatorStats for DrillFileSystem (optional)
+ * @return
+ */
+ public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf, OperatorStats stats) {
+ return createFileSystem(createProxyUgi(proxyUserName), fsConf, stats);
+ }
+
+ /** Helper method to create DrillFileSystem */
+ private static DrillFileSystem createFileSystem(UserGroupInformation proxyUserUgi, final Configuration fsConf,
+ final OperatorStats stats) {
+ DrillFileSystem fs;
+ try {
+ fs = proxyUserUgi.doAs(new PrivilegedExceptionAction<DrillFileSystem>() {
+ public DrillFileSystem run() throws Exception {
+ logger.debug("Creating DrillFileSystem for proxy user: " + UserGroupInformation.getCurrentUser());
+ return new DrillFileSystem(fsConf, stats);
+ }
+ });
+ } catch (InterruptedException | IOException e) {
+ final String errMsg = "Failed to create DrillFileSystem for proxy user: " + e.getMessage();
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
+
+ return fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index cb2753c..edbcfde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -779,7 +779,7 @@ public class Foreman implements Runnable {
if (logger.isDebugEnabled()) {
logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
}
- return new BasicOptimizer(queryContext).optimize(
+ return new BasicOptimizer(queryContext, initiatingClient).optimize(
new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index be798ec..0701252 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.work.fragment;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,7 +39,9 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
@@ -182,13 +185,23 @@ public class FragmentExecutor implements Runnable {
}
}
- injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
- /*
- * Run the query until root.next returns false OR we no longer need to continue.
- */
- while (shouldContinue() && root.next()) {
- // loop
- }
+ final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
+ ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
+ ImpersonationUtil.getProcessUserUGI();
+
+ queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
+ /*
+ * Run the query until root.next returns false OR we no longer need to continue.
+ */
+ while (shouldContinue() && root.next()) {
+ // loop
+ }
+
+ return null;
+ }
+ });
updateState(FragmentState.FINISHED);
} catch (AssertionError | Exception e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6bd8db0..8006533 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -101,6 +101,10 @@ drill.exec: {
write: true
}
},
+ impersonation: {
+ enabled: false,
+ max_chained_user_hops: 3
+ },
security.user.auth {
enabled: false,
packages += "org.apache.drill.exec.rpc.user.security",
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index c3223b8..e7f6896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.apache.drill.exec.testing.ExecutionControls;
@@ -102,7 +103,7 @@ public class PlanningBase extends ExecTest{
final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
final DrillOperatorTable table = new DrillOperatorTable(functionRegistry);
final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
- registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root);
+ registry.getSchemaFactory().registerSchemas(SchemaConfig.newBuilder("foo", context).build(), root);
new NonStrictExpectations() {
{
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index ba905c4..dc37071 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
bitContext);
PhysicalPlanReader reader = bitContext.getPlanReader();
LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
- PhysicalPlan pp = new BasicOptimizer(qc).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+ PhysicalPlan pp = new BasicOptimizer(qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 2cba992..604f375 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
@@ -38,8 +39,9 @@ public class TestExceptionInjection extends BaseTestQuery {
private static final String NO_THROW_FAIL = "Didn't throw expected exception";
private static final UserSession session = UserSession.Builder.newBuilder()
- .withOptionManager(bits[0].getContext().getOptionManager())
- .build();
+ .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
/**
* Class whose methods we want to simulate runtime at run-time for testing
@@ -248,8 +250,9 @@ public class TestExceptionInjection extends BaseTestQuery {
final DrillbitContext drillbitContext2 = drillbit2.getContext();
final UserSession session = UserSession.Builder.newBuilder()
- .withOptionManager(drillbitContext1.getOptionManager())
- .build();
+ .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+ .withOptionManager(drillbitContext1.getOptionManager())
+ .build();
final String passthroughDesc = "<<injected from descPassthrough>>";
final int nSkip = 7;
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 1c219f0..508b10c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.testing;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.junit.Test;
import org.slf4j.Logger;
@@ -29,8 +30,9 @@ import static org.junit.Assert.fail;
public class TestPauseInjection extends BaseTestQuery {
private static final UserSession session = UserSession.Builder.newBuilder()
- .withOptionManager(bits[0].getContext().getOptionManager())
- .build();
+ .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
/**
* Class whose methods we want to simulate pauses at run-time for testing
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b246d9..91707fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,7 +945,7 @@
<dependency>
<groupId>net.hydromatic</groupId>
<artifactId>optiq-core</artifactId>
- <version>0.9-drill-r21</version>
+ <version>0.9-drill-r21.1</version>
<exclusions>
<exclusion>
<groupId>org.jgrapht</groupId>
[2/4] drill git commit: DRILL-2514: Add support for impersonation in
FileSystem storage plugin.
Posted by ve...@apache.org.
DRILL-2514: Add support for impersonation in FileSystem storage plugin.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40c90403
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40c90403
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40c90403
Branch: refs/heads/master
Commit: 40c90403255d55b412efe2c3a78289bd75325119
Parents: 117b749
Author: vkorukanti <ve...@gmail.com>
Authored: Wed Mar 18 15:51:44 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/hbase/HBaseGroupScan.java | 17 +-
.../store/hbase/HBasePushFilterIntoScan.java | 3 +-
.../exec/store/hbase/HBaseSchemaFactory.java | 4 +-
.../exec/store/hbase/HBaseStoragePlugin.java | 10 +-
.../drill/exec/store/hbase/HBaseSubScan.java | 7 +-
.../HivePushPartitionFilterIntoScan.java | 2 +-
.../apache/drill/exec/store/hive/HiveScan.java | 11 +-
.../exec/store/hive/HiveStoragePlugin.java | 10 +-
.../drill/exec/store/hive/HiveSubScan.java | 6 +-
.../store/hive/schema/HiveSchemaFactory.java | 4 +-
.../drill/exec/store/mongo/MongoGroupScan.java | 13 +-
.../store/mongo/MongoPushDownFilterForScan.java | 2 +-
.../exec/store/mongo/MongoStoragePlugin.java | 15 +-
.../drill/exec/store/mongo/MongoSubScan.java | 7 +-
.../store/mongo/schema/MongoSchemaFactory.java | 6 +-
.../src/resources/drill-override-example.conf | 4 +
.../org/apache/drill/exec/ExecConstants.java | 6 +
.../drill/exec/dotdrill/DotDrillFile.java | 10 +-
.../apache/drill/exec/ops/FragmentContext.java | 30 +++-
.../org/apache/drill/exec/ops/QueryContext.java | 61 ++++++-
.../drill/exec/ops/ViewExpansionContext.java | 175 +++++++++++++++++++
.../apache/drill/exec/opt/BasicOptimizer.java | 8 +-
.../drill/exec/physical/base/AbstractBase.java | 20 +++
.../physical/base/AbstractFileGroupScan.java | 7 +
.../exec/physical/base/AbstractGroupScan.java | 8 +
.../exec/physical/base/AbstractSubScan.java | 4 +
.../exec/physical/base/PhysicalOperator.java | 8 +
.../drill/exec/physical/impl/ImplCreator.java | 128 +++++++++-----
.../drill/exec/planner/logical/DrillTable.java | 39 ++++-
.../exec/planner/logical/DrillViewTable.java | 37 ++--
.../exec/planner/logical/DynamicDrillTable.java | 9 +
.../drill/exec/planner/sql/DrillSqlWorker.java | 3 +
.../sql/handlers/CreateTableHandler.java | 4 +-
.../exec/planner/torel/ConversionContext.java | 6 +
.../server/options/SystemOptionManager.java | 1 +
.../drill/exec/store/AbstractStoragePlugin.java | 6 +-
.../apache/drill/exec/store/SchemaConfig.java | 93 ++++++++++
.../apache/drill/exec/store/SchemaFactory.java | 14 +-
.../apache/drill/exec/store/StoragePlugin.java | 18 +-
.../drill/exec/store/StoragePluginRegistry.java | 6 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 12 +-
.../exec/store/dfs/FileSystemSchemaFactory.java | 11 +-
.../drill/exec/store/dfs/FormatPlugin.java | 4 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 85 ++++++---
.../exec/store/dfs/easy/EasyFormatPlugin.java | 14 +-
.../exec/store/dfs/easy/EasyGroupScan.java | 17 +-
.../drill/exec/store/dfs/easy/EasySubScan.java | 7 +-
.../exec/store/direct/DirectGroupScan.java | 2 +-
.../drill/exec/store/direct/DirectSubScan.java | 2 +-
.../exec/store/easy/text/TextFormatPlugin.java | 5 +-
.../exec/store/ischema/InfoSchemaGroupScan.java | 2 +
.../store/ischema/InfoSchemaStoragePlugin.java | 7 +-
.../exec/store/ischema/InfoSchemaSubScan.java | 1 +
.../exec/store/ischema/RecordGenerator.java | 8 +-
.../drill/exec/store/mock/MockGroupScanPOP.java | 2 +-
.../exec/store/mock/MockStorageEngine.java | 7 +-
.../exec/store/parquet/ParquetFormatPlugin.java | 11 +-
.../exec/store/parquet/ParquetGroupScan.java | 89 ++++++----
.../exec/store/parquet/ParquetRowGroupScan.java | 12 +-
.../store/parquet/ParquetScanBatchCreator.java | 8 +-
.../drill/exec/store/sys/SystemTablePlugin.java | 7 +-
.../drill/exec/store/sys/SystemTableScan.java | 2 +
.../drill/exec/util/ImpersonationUtil.java | 162 +++++++++++++++++
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../exec/work/fragment/FragmentExecutor.java | 27 ++-
.../src/main/resources/drill-module.conf | 4 +
.../java/org/apache/drill/PlanningBase.java | 3 +-
.../exec/physical/impl/TestOptiqPlans.java | 2 +-
.../exec/testing/TestExceptionInjection.java | 11 +-
.../drill/exec/testing/TestPauseInjection.java | 6 +-
pom.xml | 2 +-
71 files changed, 1097 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 6d18d12..e52e2e4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -101,14 +101,17 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
private long scanSizeInBytes = 0;
@JsonCreator
- public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
+ public HBaseGroupScan(@JsonProperty("userName") String userName,
+ @JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
@JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
- this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
+ this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
}
- public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+ public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
+ List<SchemaPath> columns) {
+ super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.hbaseScanSpec = scanSpec;
@@ -121,6 +124,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
* @param that The HBaseGroupScan to clone
*/
private HBaseGroupScan(HBaseGroupScan that) {
+ super(that);
this.columns = that.columns;
this.hbaseScanSpec = that.hbaseScanSpec;
this.endpointFragmentMapping = that.endpointFragmentMapping;
@@ -342,7 +346,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
- return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
+ return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
+ endpointFragmentMapping.get(minorFragmentId), columns);
}
@Override
@@ -427,7 +432,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
* Empty constructor, do not use, only for testing.
*/
@VisibleForTesting
- public HBaseGroupScan() { }
+ public HBaseGroupScan() {
+ super((String)null);
+ }
/**
* Do not use, only for testing.
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 2b419d4..c395b43 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -62,7 +62,8 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
return; //no filter pushdown ==> No transformation.
}
- final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+ final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+ newScanSpec, groupScan.getColumns());
newGroupsScan.setFilterPushedDown(true);
final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 1c407e1..47d08b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -25,8 +25,8 @@ import net.hydromatic.optiq.Schema;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.Table;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
HBaseSchema schema = new HBaseSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 948d462..2214c50 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -23,9 +23,9 @@ import java.util.Set;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -60,14 +60,14 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+ public HBaseGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
HBaseScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<HBaseScanSpec>() {});
- return new HBaseGroupScan(this, scanSpec, null);
+ return new HBaseGroupScan(userName, this, scanSpec, null);
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
- schemaFactory.registerSchemas(session, parent);
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 23d8c5a..96ae257 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -55,17 +55,20 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@JsonCreator
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
@JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+ super(userName);
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
this.storage = (HBaseStoragePluginConfig) storage;
this.columns = columns;
}
- public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
+ public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
+ super(userName);
hbaseStoragePlugin = plugin;
storage = config;
this.regionScanSpecList = regionInfoList;
@@ -103,7 +106,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
+ return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 374c486..9b93ac0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -132,7 +132,7 @@ public abstract class HivePushPartitionFilterIntoScan extends StoragePluginOptim
try {
HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
- HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, oldScan.columns);
+ HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, oldScan.storagePlugin, oldScan.columns);
PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder);
} catch (ExecutionSetupException e) {
throw new DrillRuntimeException(e);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 92635a8..8a2e498 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,12 @@ public class HiveScan extends AbstractGroupScan {
private long rowCount = 0;
@JsonCreator
- public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+ public HiveScan(@JsonProperty("userName") final String userName,
+ @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") final String storagePluginName,
@JsonProperty("columns") final List<SchemaPath> columns,
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ super(userName);
this.hiveReadEntry = hiveReadEntry;
this.storagePluginName = storagePluginName;
this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +108,8 @@ public class HiveScan extends AbstractGroupScan {
endpoints = storagePlugin.getContext().getBits();
}
- public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+ public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+ super(userName);
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.storagePlugin = storagePlugin;
@@ -116,6 +119,7 @@ public class HiveScan extends AbstractGroupScan {
}
private HiveScan(final HiveScan that) {
+ super(that);
this.columns = that.columns;
this.endpoints = that.endpoints;
this.hiveReadEntry = that.hiveReadEntry;
@@ -226,8 +230,9 @@ public class HiveScan extends AbstractGroupScan {
if (parts.contains(null)) {
parts = null;
}
+
final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
- return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
+ return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 91e7a92..a19ebb8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -29,9 +29,9 @@ import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
@@ -67,7 +67,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
try {
if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -75,15 +75,15 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
"Querying views created in Hive from Drill is not supported in current version.");
}
- return new HiveScan(hiveReadEntry, this, columns);
+ return new HiveScan(userName, hiveReadEntry, this, columns);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
- schemaFactory.registerSchemas(session, parent);
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 1233202..2181c2a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -62,10 +62,12 @@ public class HiveSubScan extends AbstractBase implements SubScan {
private List<Partition> partitions;
@JsonCreator
- public HiveSubScan(@JsonProperty("splits") List<String> splits,
+ public HiveSubScan(@JsonProperty("userName") String userName,
+ @JsonProperty("splits") List<String> splits,
@JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
@JsonProperty("splitClasses") List<String> splitClasses,
@JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException {
+ super(userName);
this.hiveReadEntry = hiveReadEntry;
this.table = hiveReadEntry.getTable();
this.partitions = hiveReadEntry.getPartitions();
@@ -126,7 +128,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
try {
- return new HiveSubScan(splits, hiveReadEntry, splitClasses, columns);
+ return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 587e90d..ec30f01 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -30,8 +30,8 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveStoragePlugin;
@@ -187,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
HiveSchema schema = new HiveSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b086786..54d34f9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -115,17 +115,20 @@ public class MongoGroupScan extends AbstractGroupScan implements
private boolean filterPushedDown = false;
@JsonCreator
- public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
+ public MongoGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
ExecutionSetupException {
- this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+ this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
scanSpec, columns);
}
- public MongoGroupScan(MongoStoragePlugin storagePlugin,
+ public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+ super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.scanSpec = scanSpec;
@@ -140,6 +143,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
* The MongoGroupScan to clone
*/
private MongoGroupScan(MongoGroupScan that) {
+ super(that);
this.scanSpec = that.scanSpec;
this.columns = that.columns;
this.storagePlugin = that.storagePlugin;
@@ -446,7 +450,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
@Override
public MongoSubScan getSpecificScan(int minorFragmentId)
throws ExecutionSetupException {
- return new MongoSubScan(storagePlugin, storagePluginConfig,
+ return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
endpointFragmentMapping.get(minorFragmentId), columns);
}
@@ -554,6 +558,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
@VisibleForTesting
MongoGroupScan() {
+ super((String)null);
}
@JsonIgnore
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index 9af49b1..1d3b292 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -68,7 +68,7 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
MongoGroupScan newGroupsScan = null;
try {
- newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
+ newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
newScanSpec, groupScan.getColumns());
} catch (IOException e) {
logger.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index dfad5ef..d291325 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -25,9 +25,9 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
import org.slf4j.Logger;
@@ -63,8 +63,8 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
- schemaFactory.registerSchemas(session, parent);
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
}
@Override
@@ -73,12 +73,9 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection)
- throws IOException {
- MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
- new TypeReference<MongoScanSpec>() {
- });
- return new MongoGroupScan(this, mongoScanSpec, null);
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {});
+ return new MongoGroupScan(userName, this, mongoScanSpec, null);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 36008cf..fb6e095 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -57,10 +57,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
@JsonCreator
public MongoSubScan(
@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
@JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
@JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns)
throws ExecutionSetupException {
+ super(userName);
this.columns = columns;
this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
this.mongoStoragePlugin = (MongoStoragePlugin) registry
@@ -68,9 +70,10 @@ public class MongoSubScan extends AbstractBase implements SubScan {
this.chunkScanSpecList = chunkScanSpecList;
}
- public MongoSubScan(MongoStoragePlugin storagePlugin,
+ public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
MongoStoragePluginConfig storagePluginConfig,
List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+ super(userName);
this.mongoStoragePlugin = storagePlugin;
this.mongoPluginConfig = storagePluginConfig;
this.columns = columns;
@@ -105,7 +108,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig,
+ return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
chunkScanSpecList, columns);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index f650ccc..c941176 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -34,8 +34,8 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.mongo.MongoCnxnManager;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
MongoSchema schema = new MongoSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
@@ -188,7 +188,7 @@ public class MongoSchemaFactory implements SchemaFactory {
DrillTable getDrillTable(String dbName, String collectionName) {
MongoScanSpec mongoScanSpec = new MongoScanSpec(dbName, collectionName);
- return new DynamicDrillTable(plugin, schemaName, mongoScanSpec);
+ return new DynamicDrillTable(plugin, schemaName, null, mongoScanSpec);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 943d644..805d6e9 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -106,6 +106,10 @@ drill.exec: {
write: true
}
},
+ impersonation: {
+ enabled: false,
+ max_chained_user_hops: 3
+ },
security.user.auth {
enabled: false,
packages += "org.apache.drill.exec.rpc.user.security",
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f7648b1..bafbbc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -75,6 +75,8 @@ public interface ExecConstants {
public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
+ public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
+ public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
public static final String USER_AUTHENTICATOR_IMPL_PACKAGES = "drill.exec.security.user.auth.packages";
public static final String USER_AUTHENTICATION_ENABLED = "drill.exec.security.user.auth.enabled";
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
@@ -220,4 +222,8 @@ public interface ExecConstants {
public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
+
+ public static final String NEW_VIEW_DEFAULT_PERMS_KEY = "new_view_default_permissions";
+ public static final OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
+ new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
index f9a8ff5..efa8cc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.io.InputStream;
public class DotDrillFile {
@@ -51,6 +52,13 @@ public class DotDrillFile {
}
/**
+ * @return Return owner of the file in underlying file system.
+ */
+ public String getOwner() {
+ return status.getOwner();
+ }
+
+ /**
* Return base file name without the parent directory and extensions.
* @return Base file name.
*/
@@ -59,7 +67,7 @@ public class DotDrillFile {
return fileName.substring(0, fileName.lastIndexOf(type.getEnding()));
}
- public View getView(DrillConfig config) throws Exception{
+ public View getView(DrillConfig config) throws IOException {
Preconditions.checkArgument(type == DotDrillType.VIEW);
try(InputStream is = fs.open(status.getPath())){
return config.getMapper().readValue(is, View.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 7dfd0e6..c566a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
@@ -48,7 +49,9 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import com.google.common.annotations.VisibleForTesting;
@@ -201,7 +204,18 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return null;
}
- return queryContext.getRootSchema();
+ final boolean isImpersonationEnabled = isImpersonationEnabled();
+ // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
+ // InfoSchema purpose we want to show tables the user has permissions to list or query. If impersonation is
+ // disabled view the schema as Drillbit process user and throw authorization errors to client.
+ SchemaConfig schemaConfig = SchemaConfig
+ .newBuilder(
+ isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
+ queryContext)
+ .setIgnoreAuthErrors(isImpersonationEnabled)
+ .build();
+
+ return queryContext.getRootSchema(schemaConfig);
}
/**
@@ -327,6 +341,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return executionControls;
}
+ public String getQueryUserName() {
+ return fragment.getCredentials().getUserName();
+ }
+
+ public boolean isImpersonationEnabled() {
+ // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+ // no config
+ if (getConfig() == null) {
+ return false;
+ }
+
+ return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+ }
+
@Override
public void close() {
waitForSendComplete();
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index cd5c054..cc02658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -39,8 +40,10 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.store.PartitionExplorerImpl;
+import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
// TODO except for a couple of tests, this is only created by Foreman
// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -49,6 +52,9 @@ import org.apache.drill.exec.testing.ExecutionControls;
public class QueryContext implements AutoCloseable, UdfUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+ private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
+ private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+
private final DrillbitContext drillbitContext;
private final UserSession session;
private final OptionManager queryOptions;
@@ -59,8 +65,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
private final BufferAllocator allocator;
private final BufferManager bufferManager;
private final QueryDateTimeInfo queryDateTimeInfo;
- private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
- private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+ private final ViewExpansionContext viewExpansionContext;
/*
* Flag to indicate if close has been called, after calling close the first
@@ -89,6 +94,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
}
// TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
bufferManager = new BufferManager(this.allocator, null);
+ viewExpansionContext = new ViewExpansionContext(this);
}
public PlannerSettings getPlannerSettings() {
@@ -103,6 +109,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
return allocator;
}
+ /**
+ * Return reference to default schema instance in a schema tree. Each {@link net.hydromatic.optiq.SchemaPlus}
+ * instance can refer to its parent and its children. From the returned reference to default schema instance,
+ * clients can traverse the entire schema tree and know the default schema where to look up the tables first.
+ *
+ * @return Reference to default schema instance in a schema tree.
+ */
public SchemaPlus getNewDefaultSchema() {
final SchemaPlus rootSchema = getRootSchema();
final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
@@ -113,18 +126,52 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
return defaultSchema;
}
+ /**
+ * Get root schema with schema owner as the user who issued the query that is managed by this QueryContext.
+ * @return Root of the schema tree.
+ */
public SchemaPlus getRootSchema() {
+ return getRootSchema(getQueryUserName());
+ }
+
+ /**
+ * Return root schema with schema owner as the given user.
+ *
+ * @param userName User who owns the schema tree.
+ * @return Root of the schema tree.
+ */
+ public SchemaPlus getRootSchema(String userName) {
+ final String schemaUser = isImpersonationEnabled() ? userName : ImpersonationUtil.getProcessUserName();
+ final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, this).build();
+ return getRootSchema(schemaConfig);
+ }
+
+ /**
+ * Create and return a SchemaTree with given <i>schemaConfig</i>.
+ * @param schemaConfig
+ * @return
+ */
+ public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
try {
final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
- drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+ drillbitContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
return rootSchema;
} catch(IOException e) {
+ // We can't proceed further without a schema, throw a runtime exception.
final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
logger.error(errMsg, e);
throw new DrillRuntimeException(errMsg, e);
}
}
+ /**
+ * Get the user name of the user who issued the query that is managed by this QueryContext.
+ * @return
+ */
+ public String getQueryUserName() {
+ return session.getCredentials().getUserName();
+ }
+
public OptionManager getOptions() {
return queryOptions;
}
@@ -153,6 +200,14 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
return drillbitContext.getFunctionImplementationRegistry();
}
+ public ViewExpansionContext getViewExpansionContext() {
+ return viewExpansionContext;
+ }
+
+ public boolean isImpersonationEnabled() {
+ return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+ }
+
public DrillOperatorTable getDrillOperatorTable() {
return table;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
new file mode 100644
index 0000000..9d04ab9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.google.common.base.Preconditions;
+import net.hydromatic.optiq.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelOptTable.ToRelContext;
+
+import static org.apache.drill.exec.ExecConstants.IMPERSONATION_MAX_CHAINED_USER_HOPS;
+
+/**
+ * Contains context information about view expansion(s) in a query. Part of {@link org.apache.drill.exec.ops
+ * .QueryContext}. Before expanding a view into its definition, as part of the
+ * {@link org.apache.drill.exec.planner.logical.DrillViewTable#toRel(ToRelContext, RelOptTable)}, first a
+ * {@link ViewExpansionToken} is requested from ViewExpansionContext through {@link #reserveViewExpansionToken(String)}.
+ * Once view expansion is complete, a token is released through {@link ViewExpansionToken#release()}. A view definition
+ * itself may contain zero or more views for expanding those nested views also a token is obtained.
+ *
+ * Ex:
+ * Following are the available view tables: { "view_1", "view_2", "view_3", "view_4" }. Corresponding owners are
+ * {"view1Owner", "view2Owner", "view3Owner", "view4Owner"}.
+ * Definition of "view4" : "SELECT field4 FROM view3"
+ * Definition of "view3" : "SELECT field4, field3 FROM view2"
+ * Definition of "view2" : "SELECT field4, field3, field2 FROM view1"
+ * Definition of "view1" : "SELECT field4, field3, field2, field1 FROM someTable"
+ *
+ * Query is: "SELECT * FROM view4".
+ * Steps:
+ * 1. "view4" comes for expanding it into its definition
+ * 2. A token "view4Token" is requested through {@link #reserveViewExpansionToken(String view4Owner)}
+ * 3. "view4" is called for expansion. As part of it
+ * 3.1 "view3" comes for expansion
+ * 3.2 A token "view3Token" is requested through {@link #reserveViewExpansionToken(String view3Owner)}
+ * 3.3 "view3" is called for expansion. As part of it
+ * 3.3.1 "view2" comes for expansion
+ * 3.3.2 A token "view2Token" is requested through {@link #reserveViewExpansionToken(String view2Owner)}
+ * 3.3.3 "view2" is called for expansion. As part of it
+ * 3.3.3.1 "view1" comes for expansion
+ * 3.3.3.2 A token "view1Token" is requested through {@link #reserveViewExpansionToken(String view1Owner)}
+ * 3.3.3.3 "view1" is called for expansion
+ * 3.3.3.4 "view1" expansion is complete
+ * 3.3.3.5 Token "view1Token" is released
+ * 3.3.4 "view2" expansion is complete
+ * 3.3.5 Token "view2Token" is released
+ * 3.4 "view3" expansion is complete
+ * 3.5 Token "view3Token" is released
+ * 4. "view4" expansion is complete
+ * 5. Token "view4Token" is released.
+ *
+ */
+public class ViewExpansionContext {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewExpansionContext.class);
+
+ private final QueryContext queryContext;
+ private final int maxChainedUserHops;
+ private final String queryUser;
+ private final ObjectIntOpenHashMap<String> userTokens = new ObjectIntOpenHashMap<>();
+
+ public ViewExpansionContext(QueryContext queryContext) {
+ this.queryContext = queryContext;
+ this.maxChainedUserHops =
+ queryContext.getConfig().getInt(IMPERSONATION_MAX_CHAINED_USER_HOPS);
+ this.queryUser = queryContext.getQueryUserName();
+ }
+
+ public boolean isImpersonationEnabled() {
+ return queryContext.isImpersonationEnabled();
+ }
+
+ /**
+ * Reserve a token for expansion of view owned by given user name. If it can't issue any more tokens,
+ * throws {@link UserException}.
+ *
+ * @param viewOwner Name of the user who owns the view.
+ * @return An instance of {@link org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken} which must be
+ * released when done using the token.
+ */
+ public ViewExpansionToken reserveViewExpansionToken(String viewOwner) {
+ int totalTokens = 1;
+ if (!viewOwner.equals(queryUser)) {
+ // We want to track the tokens only if the "viewOwner" is not same as the "queryUser".
+ if (userTokens.containsKey(viewOwner)) {
+ // If the user already exists, we don't need to validate the limit on maximum user hops in chained impersonation
+ // as the limit is for number of unique users.
+ totalTokens += userTokens.get(viewOwner);
+ } else {
+ // Make sure we are not exceeding the limit of maximum number impersonation user hops in chained impersonation.
+ if (userTokens.size() == maxChainedUserHops) {
+ final String errMsg =
+ String.format("Cannot issue token for view expansion as issuing the token exceeds the " +
+ "maximum allowed number of user hops (%d) in chained impersonation.", maxChainedUserHops);
+ logger.error(errMsg);
+ throw UserException.permissionError().message(errMsg).build();
+ }
+ }
+
+ userTokens.put(viewOwner, totalTokens);
+
+ logger.debug("Issued view expansion token for user '{}'", viewOwner);
+ }
+
+ return new ViewExpansionToken(viewOwner);
+ }
+
+ private void releaseViewExpansionToken(ViewExpansionToken token) {
+ final String viewOwner = token.viewOwner;
+
+ if (viewOwner.equals(queryUser)) {
+ // If the token owner and queryUser are same, no need to track the token release.
+ return;
+ }
+
+ Preconditions.checkState(userTokens.containsKey(token.viewOwner),
+ "Given user doesn't exist in User Token store. Make sure token for this user is obtained first.");
+
+ final int userTokenCount = userTokens.get(viewOwner);
+ if (userTokenCount == 1) {
+ // Remove the user from collection, when there are no more tokens issued to the user.
+ userTokens.remove(viewOwner);
+ } else {
+ userTokens.put(viewOwner, userTokenCount - 1);
+ }
+ logger.debug("Released view expansion token issued for user '{}'", viewOwner);
+ }
+
+ /**
+ * Represents token issued to a view owner for expanding the view.
+ */
+ public class ViewExpansionToken {
+ private final String viewOwner;
+
+ private boolean released;
+
+ ViewExpansionToken(String viewOwner) {
+ this.viewOwner = viewOwner;
+ }
+
+ /**
+ * Get schema tree for view owner who owns this token.
+ * @return Root of schema tree.
+ */
+ public SchemaPlus getSchemaTree() {
+ Preconditions.checkState(!released, "Trying to use released token.");
+ return queryContext.getRootSchema(viewOwner);
+ }
+
+ /**
+ * Release the token. Once released all method calls (except release) cause {@link java.lang.IllegalStateException}.
+ */
+ public void release() {
+ if (!released) {
+ released = true;
+ releaseViewExpansionToken(this);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index b1a71a5..9e60f21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.eigenbase.rel.RelFieldCollation.Direction;
@@ -69,9 +70,11 @@ public class BasicOptimizer extends Optimizer {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
private final QueryContext queryContext;
+ private final UserClientConnection userSession;
- public BasicOptimizer(final QueryContext queryContext) {
+ public BasicOptimizer(final QueryContext queryContext, final UserClientConnection userSession) {
this.queryContext = queryContext;
+ this.userSession = userSession;
}
@Override
@@ -208,7 +211,8 @@ public class BasicOptimizer extends Optimizer {
}
try {
final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
- return storagePlugin.getPhysicalScan(scan.getSelection());
+ final String user = userSession.getSession().getCredentials().getUserName();
+ return storagePlugin.getPhysicalScan(user, scan.getSelection());
} catch (IOException | ExecutionSetupException e) {
throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index defb4e4..c7b0e7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -25,11 +25,26 @@ import com.google.common.base.Preconditions;
public abstract class AbstractBase implements PhysicalOperator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
+ private final String userName;
+
protected long initialAllocation = 1000000L;
protected long maxAllocation = 10000000000L;
private int id;
private double cost;
+ public AbstractBase() {
+ userName = null;
+ }
+
+ public AbstractBase(String userName) {
+ this.userName = userName;
+ }
+
+ public AbstractBase(AbstractBase that) {
+ Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+ this.userName = that.userName;
+ }
+
@Override
public void accept(GraphVisitor<PhysicalOperator> visitor) {
visitor.enter(this);
@@ -48,6 +63,7 @@ public abstract class AbstractBase implements PhysicalOperator{
return true;
}
+ @Override
public final void setOperatorId(int id) {
this.id = id;
}
@@ -80,4 +96,8 @@ public abstract class AbstractBase implements PhysicalOperator{
return maxAllocation;
}
+ @Override
+ public String getUserName() {
+ return userName;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
index ee809fc..606aa4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -24,6 +24,13 @@ import org.apache.drill.exec.store.dfs.FileSelection;
public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
+ public AbstractFileGroupScan(String userName) {
+ super(userName);
+ }
+
+ public AbstractFileGroupScan(AbstractFileGroupScan that) {
+ super(that);
+ }
@Override
public void modifyFileSelection(FileSelection selection) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 8fe21e6..242bd5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -30,6 +30,14 @@ import org.apache.drill.exec.physical.EndpointAffinity;
public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
+ public AbstractGroupScan(String userName) {
+ super(userName);
+ }
+
+ public AbstractGroupScan(AbstractGroupScan that) {
+ super(that);
+ }
+
@Override
public Iterator<PhysicalOperator> iterator() {
return Iterators.emptyIterator();
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index a36a46e..5ec5698 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -29,6 +29,10 @@ import com.google.common.collect.Iterators;
public abstract class AbstractSubScan extends AbstractBase implements SubScan{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
+ public AbstractSubScan(String userName) {
+ super(userName);
+ }
+
@Override
public boolean isExecutable() {
return true;
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index a5518ca..b1954ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -95,6 +95,14 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
@JsonProperty("cost")
public double getCost();
+ /**
+ * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
+ * PhysicalOperator. Default value is "null" in which case we impersonate as user who launched the query.
+ * @return
+ */
+ @JsonProperty("userName")
+ public String getUserName();
+
@JsonIgnore
public int getOperatorType();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index e25f1c0..912dfd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,12 +17,13 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -32,65 +33,112 @@ import org.apache.drill.exec.util.AssertionUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
/**
- * Implementation of the physical operator visitor
+ * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree.
*/
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
+public class ImplCreator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
- private RootExec root = null;
+ private static final ImplCreator INSTANCE = new ImplCreator();
private ImplCreator() {}
- private RootExec getRoot() {
- return root;
+ /**
+ * Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children
+ * (which may contain child RecordBatches and so on).
+ * @param context FragmentContext.
+ * @param root FragmentRoot.
+ * @return RootExec of fragment.
+ * @throws ExecutionSetupException
+ */
+ public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+ Preconditions.checkNotNull(root);
+ Preconditions.checkNotNull(context);
+
+ if (AssertionUtil.isAssertionsEnabled()) {
+ root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+ }
+
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ final RootExec rootExec = INSTANCE.getRootExec(root, context);
+ logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
+ if (rootExec == null) {
+ throw new ExecutionSetupException(
+ "The provided fragment did not have a root node that correctly created a RootExec value.");
+ }
+
+ return rootExec;
}
- @Override
- @SuppressWarnings("unchecked")
- public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
- Preconditions.checkNotNull(op);
- Preconditions.checkNotNull(context);
+ /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
+ private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+ final List<RecordBatch> childRecordBatches = getChildren(root, context);
- Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(op.getClass());
- if (opCreator != null) {
- if (op instanceof FragmentRoot ) {
- root = ((RootCreator<PhysicalOperator>)opCreator).getRoot(context, op, getChildren(op, context));
- return null;
- } else {
- return ((BatchCreator<PhysicalOperator>)opCreator).getBatch(context, op, getChildren(op, context));
+ if (context.isImpersonationEnabled()) {
+ final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName());
+ try {
+ return proxyUgi.doAs(new PrivilegedExceptionAction<RootExec>() {
+ public RootExec run() throws Exception {
+ return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
+ }
+ });
+ } catch (InterruptedException | IOException e) {
+ final String errMsg = String.format("Failed to create RootExec for operator with id '%d'", root.getOperatorId());
+ logger.error(errMsg, e);
+ throw new ExecutionSetupException(errMsg, e);
}
} else {
- throw new UnsupportedOperationException(String.format(
- "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.",
- this.getClass().getCanonicalName(), op.getClass().getCanonicalName()));
+ return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
}
}
- private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
- List<RecordBatch> children = Lists.newArrayList();
- for (PhysicalOperator child : op) {
- children.add(child.accept(this, context));
+ /** Create a RecordBatch and its children for given PhysicalOperator */
+ private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ Preconditions.checkNotNull(op);
+
+ final List<RecordBatch> childRecordBatches = getChildren(op, context);
+
+ if (context.isImpersonationEnabled()) {
+ final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName());
+ try {
+ return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
+ public RecordBatch run() throws Exception {
+ return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+ }
+ });
+ } catch (InterruptedException | IOException e) {
+ final String errMsg = String.format("Failed to create RecordBatch for operator with id '%d'", op.getOperatorId());
+ logger.error(errMsg, e);
+ throw new ExecutionSetupException(errMsg, e);
+ }
+ } else {
+ return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
}
- return children;
}
- public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
- ImplCreator i = new ImplCreator();
- if (AssertionUtil.isAssertionsEnabled()) {
- root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+ /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
+ private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ final Class opClass = op.getClass();
+ Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+ if (opCreator == null) {
+ throw new UnsupportedOperationException(
+ String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
}
- Stopwatch watch = new Stopwatch();
- watch.start();
- root.accept(i, context);
- logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
- if (i.root == null) {
- throw new ExecutionSetupException(
- "The provided fragment did not have a root node that correctly created a RootExec value.");
- }
- return i.getRoot();
+ return opCreator;
}
-}
+ /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
+ private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ List<RecordBatch> children = Lists.newArrayList();
+ for (PhysicalOperator child : op) {
+ children.add(getRecordBatch(child, context));
+ }
+
+ return children;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index c8f872e..5451ca0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -28,29 +28,48 @@ import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.ImpersonationUtil;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptTable;
public abstract class DrillTable implements Table {
private final String storageEngineName;
- public final StoragePluginConfig storageEngineConfig;
- private Object selection;
- private StoragePlugin plugin;
+ private final StoragePluginConfig storageEngineConfig;
+ private final Object selection;
+ private final StoragePlugin plugin;
+ private final String userName;
+
private GroupScan scan;
- /** Creates a DrillTable. */
- public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+ /**
+ * Creates a DrillTable instance.
+ * @param storageEngineName StorageEngine name.
+ * @param plugin Reference to StoragePlugin.
+ * @param userName Whom to impersonate while reading the contents of the table.
+ * @param selection Table contents (type and contents depend on type of StoragePlugin).
+ */
+ public DrillTable(String storageEngineName, StoragePlugin plugin, String userName, Object selection) {
this.selection = selection;
this.plugin = plugin;
this.storageEngineConfig = plugin.getConfig();
this.storageEngineName = storageEngineName;
+ this.userName = userName;
+ }
+
+ /**
+ * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+ * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+ * we can remove this constructor.
+ */
+ public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+ this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
}
public GroupScan getGroupScan() throws IOException{
if (scan == null) {
- this.scan = plugin.getPhysicalScan(new JSONOptions(selection));
+ this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
}
return scan;
}
@@ -94,6 +113,7 @@ public abstract class DrillTable implements Table {
result = prime * result + ((selection == null) ? 0 : selection.hashCode());
result = prime * result + ((storageEngineConfig == null) ? 0 : storageEngineConfig.hashCode());
result = prime * result + ((storageEngineName == null) ? 0 : storageEngineName.hashCode());
+ result = prime * result + ((userName == null) ? 0 : userName.hashCode());
return result;
}
@@ -130,6 +150,13 @@ public abstract class DrillTable implements Table {
} else if (!storageEngineName.equals(other.storageEngineName)) {
return false;
}
+ if (userName == null) {
+ if (other.userName != null) {
+ return false;
+ }
+ } else if (!userName.equals(other.userName)) {
+ return false;
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
index 68e666a..9c5a94f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
@@ -17,14 +17,14 @@
*/
package org.apache.drill.exec.planner.logical;
-import java.util.List;
-
import net.hydromatic.optiq.Schema.TableType;
import net.hydromatic.optiq.Statistic;
import net.hydromatic.optiq.Statistics;
import net.hydromatic.optiq.TranslatableTable;
import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptTable;
import org.eigenbase.relopt.RelOptTable.ToRelContext;
@@ -36,9 +36,13 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillViewTable.class);
private final View view;
+ private final String viewOwner;
+ private final ViewExpansionContext viewExpansionContext;
- public DrillViewTable(List<String> path, View view){
+ public DrillViewTable(View view, String viewOwner, ViewExpansionContext viewExpansionContext){
this.view = view;
+ this.viewOwner = viewOwner;
+ this.viewExpansionContext = viewExpansionContext;
}
@Override
@@ -53,15 +57,28 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
@Override
public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
- RelDataType rowType = relOptTable.getRowType();
- RelNode rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+ ViewExpansionToken token = null;
+ try {
+ RelDataType rowType = relOptTable.getRowType();
+ RelNode rel;
+
+ if (viewExpansionContext.isImpersonationEnabled()) {
+ token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+ rel = context.expandView(rowType, view.getSql(), token.getSchemaTree(), view.getWorkspaceSchemaPath());
+ } else {
+ rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+ }
+
+ // If the View's field list is not "*", create a cast.
+ if (!view.isDynamic() && !view.hasStar()) {
+ rel = RelOptUtil.createCastRel(rel, rowType, true);
+ }
- if (view.isDynamic() || view.hasStar()){
- // if View's field has "*", return rel directly.
return rel;
- }else{
- // if the View's field list is not "*", try to create a cast.
- return RelOptUtil.createCastRel(rel, rowType, true);
+ } finally {
+ if (token != null) {
+ token.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
index 843db58..24917f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
@@ -28,6 +28,15 @@ public class DynamicDrillTable extends DrillTable{
private RelDataTypeHolder holder = new RelDataTypeHolder();
+ public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, String userName, Object selection) {
+ super(storageEngineName, plugin, userName, selection);
+ }
+
+ /**
+ * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+ * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+ * we can remove this constructor.
+ */
public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, Object selection) {
super(storageEngineName, plugin, selection);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 097b7bb..d56f1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.security.AccessControlException;
import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.rules.ReduceExpressionsRule;
import org.eigenbase.rel.rules.WindowedAggSplitterRule;
@@ -159,6 +160,8 @@ public class DrillSqlWorker {
} catch(ValidationException e) {
String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw UserException.parseError(e).message(errorMessage).build();
+ } catch (AccessControlException e) {
+ throw UserException.permissionError(e).build();
} catch (IOException | RelConversionException e) {
throw new QueryInputException("Failure handling SQL.", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index a17a604..3e990c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -58,8 +58,8 @@ public class CreateTableHandler extends DefaultSqlHandler {
AbstractSchema drillSchema = getDrillSchema(schema);
if (!drillSchema.isMutable()) {
- return DirectPlan.createDirectPlan(context, false, String.format("Current schema '%s' is not a mutable schema. " +
- "Can't create tables in this schema.", drillSchema.getFullSchemaName()));
+ return DirectPlan.createDirectPlan(context, false, String.format("Unable to create table. " +
+ "Schema [%s] is immutable. ", drillSchema.getFullSchemaName()));
}
final String newTblName = sqlCreateTable.getName();
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
index a486369..5f9061a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.torel;
import java.util.List;
import java.util.Map;
+import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.prepare.Prepare;
import org.apache.drill.common.expression.LogicalExpression;
@@ -114,6 +115,11 @@ public class ConversionContext implements ToRelContext {
throw new UnsupportedOperationException();
}
+ @Override
+ public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
+ throw new UnsupportedOperationException();
+ }
+
private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a745479..3dc7c14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -96,6 +96,7 @@ public class SystemOptionManager extends BaseOptionManager {
ExecConstants.HASH_JOIN_TABLE_FACTOR,
ExecConstants.HASH_AGG_TABLE_FACTOR,
ExecConstants.AVERAGE_FIELD_WIDTH,
+ ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
QueryClassLoader.JAVA_COMPILER_VALIDATOR,
QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
QueryClassLoader.JAVA_COMPILER_DEBUG,
[4/4] drill git commit: DRILL-2514: Part2 - Add impersonation tests
using Hadoop MiniDFSCluster.
Posted by ve...@apache.org.
DRILL-2514: Part2 - Add impersonation tests using Hadoop MiniDFSCluster.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2a484251
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2a484251
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2a484251
Branch: refs/heads/master
Commit: 2a484251be48b0443318626b1364044db5473124
Parents: 40c9040
Author: vkorukanti <ve...@gmail.com>
Authored: Tue Mar 24 15:12:01 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:01 2015 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 12 +
.../java/org/apache/drill/BaseTestQuery.java | 12 +
.../impersonation/BaseTestImpersonation.java | 89 ++++++
.../TestImpersonationMetadata.java | 300 ++++++++++++++++++
.../impersonation/TestImpersonationQueries.java | 304 +++++++++++++++++++
pom.xml | 121 +++++++-
6 files changed, 837 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f5313ca..82426ef 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -351,6 +351,12 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
@@ -363,6 +369,12 @@
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 2ff4de7..b02051b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -186,6 +186,7 @@ public class BaseTestQuery extends ExecTest {
* @param properties
*/
public static void updateClient(Properties properties) throws Exception {
+ Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
if (client != null) {
client.close();
client = null;
@@ -194,6 +195,17 @@ public class BaseTestQuery extends ExecTest {
client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties);
}
+ /*
+ * Close the current <i>client</i> and open a new client for the given user. All tests executed
+ * after this method call use the new <i>client</i>.
+ * @param user
+ */
+ public static void updateClient(String user) throws Exception {
+ final Properties props = new Properties();
+ props.setProperty("user", user);
+ updateClient(props);
+ }
+
protected static BufferAllocator getAllocator() {
return allocator;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
new file mode 100644
index 0000000..274f5f7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Properties;
+
+public class BaseTestImpersonation extends PlanTestBase {
+ protected static final String processUser = System.getProperty("user.name");
+
+ protected static MiniDFSCluster dfsCluster;
+ protected static Configuration conf;
+ protected static String miniDfsStoragePath;
+
+ protected static void startMiniDfsCluster(String testClass) throws Exception {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(testClass), "Expected a non-null and non-empty test class name");
+ conf = new Configuration();
+
+ // Set the MiniDfs base dir to be the temp directory of the test, so that all files created within the MiniDfs
+ // are properly cleanup when test exits.
+ miniDfsStoragePath = System.getProperty("java.io.tmpdir") + Path.SEPARATOR + testClass;
+ conf.set("hdfs.minidfs.basedir", miniDfsStoragePath);
+
+ // Set the proxyuser settings so that the user who is running the Drillbits/MiniDfs can impersonate other users.
+ conf.set(String.format("hadoop.proxyuser.%s.hosts", processUser), "*");
+ conf.set(String.format("hadoop.proxyuser.%s.groups", processUser), "*");
+
+ // Start the MiniDfs cluster
+ dfsCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3)
+ .format(true)
+ .build();
+
+ final Properties props = cloneDefaultTestConfigProperties();
+ props.setProperty(ExecConstants.IMPERSONATION_ENABLED, "true");
+
+ updateTestCluster(1, DrillConfig.create(props));
+ }
+
+ protected static void createAndAddWorkspace(FileSystem fs, String name, String path, short permissions, String owner,
+ String group,
+ Map<String, WorkspaceConfig> workspaces) throws Exception {
+ final Path dirPath = new Path(path);
+ FileSystem.mkdirs(fs, dirPath, new FsPermission(permissions));
+ fs.setOwner(dirPath, owner, group);
+ final WorkspaceConfig ws = new WorkspaceConfig(path, true, "parquet");
+ workspaces.put(name, ws);
+ }
+
+ protected static void stopMiniDfsCluster() throws Exception {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (miniDfsStoragePath != null) {
+ FileUtils.deleteQuietly(new File(miniDfsStoragePath));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
new file mode 100644
index 0000000..411660f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests impersonation on metadata related queries as SHOW FILES, SHOW TABLES, CREATE VIEW and CREATE TABLE
+ */
+public class TestImpersonationMetadata extends BaseTestImpersonation {
+ private static final String MINIDFS_STORAGE_PLUGIN_NAME = "minidfs" + TestImpersonationMetadata.class.getSimpleName();
+
+ private static final String user1 = "drillTestUser1";
+ private static final String user2 = "drillTestUser2";
+
+ private static final String group0 = "drillTestGrp0";
+ private static final String group1 = "drillTestGrp1";
+
+ static {
+ UserGroupInformation.createUserForTesting(user1, new String[]{ group1, group0 });
+ UserGroupInformation.createUserForTesting(user2, new String[]{ group1 });
+ }
+
+ @BeforeClass
+ public static void addMiniDfsBasedStorage() throws Exception {
+ startMiniDfsCluster(TestImpersonationMetadata.class.getSimpleName());
+
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ final FileSystemConfig lfsPluginConfig = (FileSystemConfig) pluginRegistry.getPlugin("dfs").getConfig();
+
+ final FileSystemConfig miniDfsPluginConfig = new FileSystemConfig();
+ miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+ Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+
+ createTestWorkspaces(workspaces);
+
+ miniDfsPluginConfig.workspaces = workspaces;
+ miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
+ miniDfsPluginConfig.setEnabled(true);
+
+ pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+ }
+
+ private static void createTestWorkspaces(Map<String, WorkspaceConfig> workspaces) throws Exception {
+ // Create "/tmp" folder and set permissions to "777"
+ final FileSystem fs = dfsCluster.getFileSystem();
+ final Path tmpPath = new Path("/tmp");
+ fs.delete(tmpPath, true);
+ FileSystem.mkdirs(fs, tmpPath, new FsPermission((short)0777));
+
+ // Create /drillTestGrp0_700 directory with permissions 700 (owned by user running the tests)
+ createAndAddWorkspace(fs, "drillTestGrp0_700", "/drillTestGrp0_700", (short)0700, processUser, group0, workspaces);
+
+ // Create /drillTestGrp0_750 directory with permissions 750 (owned by user running the tests)
+ createAndAddWorkspace(fs, "drillTestGrp0_750", "/drillTestGrp0_750", (short)0750, processUser, group0, workspaces);
+
+ // Create /drillTestGrp0_755 directory with permissions 755 (owned by user running the tests)
+ createAndAddWorkspace(fs, "drillTestGrp0_755", "/drillTestGrp0_755", (short)0755, processUser, group0, workspaces);
+
+ // Create /drillTestGrp0_770 directory with permissions 770 (owned by user running the tests)
+ createAndAddWorkspace(fs, "drillTestGrp0_770", "/drillTestGrp0_770", (short)0770, processUser, group0, workspaces);
+
+ // Create /drillTestGrp0_777 directory with permissions 777 (owned by user running the tests)
+ createAndAddWorkspace(fs, "drillTestGrp0_777", "/drillTestGrp0_777", (short)0777, processUser, group0, workspaces);
+
+ // Create /drillTestGrp1_700 directory with permissions 700 (owned by user1)
+ createAndAddWorkspace(fs, "drillTestGrp1_700", "/drillTestGrp1_700", (short)0700, user1, group1, workspaces);
+ }
+
+ @Test
+ public void testShowFilesInWSWithUserAndGroupPermissionsForQueryUser() throws Exception {
+ updateClient(user1);
+
+ // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
+ test(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+
+ // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for
+ // "user1"
+ test(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+ }
+
+ @Test
+ public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
+ updateClient(user2);
+ // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part
+ // of the "group0"
+ test(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+ }
+
+ @Test
+ public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
+ UserRemoteException ex = null;
+
+ updateClient(user2);
+ try {
+ // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
+ test(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+ } catch(UserRemoteException e) {
+ ex = e;
+ }
+
+ assertNotNull("UserRemoteException is expected", ex);
+ assertThat(ex.getMessage(),
+ containsString("Permission denied: user=drillTestUser2, " +
+ "access=READ_EXECUTE, inode=\"/drillTestGrp1_700\":drillTestUser1:drillTestGrp1:drwx------"));
+ }
+
+ @Test
+ public void testShowSchemasSanityCheck() throws Exception {
+ test("SHOW SCHEMAS");
+ }
+
+ @Test
+ public void testCreateViewInDirWithUserPermissionsForQueryUser() throws Exception {
+ final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp1_700"; // Workspace dir owned by "user1"
+ testCreateViewTestHelper(user1, viewSchema, "view1");
+ }
+
+ @Test
+ public void testCreateViewInDirWithGroupPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
+ final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_770";
+ testCreateViewTestHelper(user1, viewSchema, "view1");
+ }
+
+ @Test
+ public void testCreateViewInDirWithOtherPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+ final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_777";
+ testCreateViewTestHelper(user2, viewSchema, "view1");
+ }
+
+ private static void testCreateViewTestHelper(String user, String viewSchema,
+ String viewName) throws Exception {
+ try {
+ updateClient(user);
+
+ test("USE " + viewSchema);
+
+ test("CREATE VIEW " + viewName + " AS SELECT " +
+ "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+ testBuilder()
+ .sqlQuery("SHOW TABLES")
+ .unOrdered()
+ .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
+ .baselineValues(viewSchema, viewName)
+ .go();
+
+ test("SHOW FILES");
+
+ testBuilder()
+ .sqlQuery("SELECT * FROM " + viewName + " LIMIT 1")
+ .ordered()
+ .baselineColumns("c_custkey", "c_nationkey")
+ .baselineValues(1, 15)
+ .go();
+
+ } finally {
+ test("DROP VIEW " + viewSchema + "." + viewName);
+ }
+ }
+
+ @Test
+ public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+ final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_755";
+ final String viewName = "view1";
+
+ updateClient(user2);
+
+ test("USE " + viewSchema);
+
+ test("CREATE VIEW " + viewName + " AS SELECT " +
+ "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+ // SHOW TABLES is expected to return no records as view creation fails above.
+ testBuilder()
+ .sqlQuery("SHOW TABLES")
+ .expectsEmptyResultSet()
+ .go();
+
+ test("SHOW FILES");
+ }
+
+ @Test
+ public void testCreateTableInDirWithUserPermissionsForQueryUser() throws Exception {
+ final String tableWS = "drillTestGrp1_700"; // Workspace dir owned by "user1"
+ testCreateTableTestHelper(user1, tableWS, "table1");
+ }
+
+ @Test
+ public void testCreateTableInDirWithGroupPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
+ final String tableWS = "drillTestGrp0_770";
+ testCreateTableTestHelper(user1, tableWS, "table1");
+ }
+
+ @Test
+ public void testCreateTableInDirWithOtherPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+ final String tableWS = "drillTestGrp0_777";
+ testCreateTableTestHelper(user2, tableWS, "table1");
+ }
+
+ private static void testCreateTableTestHelper(String user, String tableWS,
+ String tableName) throws Exception {
+ try {
+ updateClient(user);
+
+ test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+
+ test("CREATE TABLE " + tableName + " AS SELECT " +
+ "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+ test("SHOW FILES");
+
+ testBuilder()
+ .sqlQuery("SELECT * FROM " + tableName + " LIMIT 1")
+ .ordered()
+ .baselineColumns("c_custkey", "c_nationkey")
+ .baselineValues(1, 15)
+ .go();
+
+ } finally {
+ // There is no drop table, we need to delete the table directory through FileSystem object
+ final FileSystem fs = dfsCluster.getFileSystem();
+ final Path tablePath = new Path(Path.SEPARATOR + tableWS + Path.SEPARATOR + tableName);
+ if (fs.exists(tablePath)) {
+ fs.delete(tablePath, true);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception {
+ // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+ final String tableWS = "drillTestGrp0_755";
+ final String tableName = "table1";
+
+ UserRemoteException ex = null;
+
+ try {
+ updateClient(user2);
+
+ test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+
+ test("CREATE TABLE " + tableName + " AS SELECT " +
+ "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+ } catch(UserRemoteException e) {
+ ex = e;
+ }
+
+ assertNotNull("UserRemoteException is expected", ex);
+ assertThat(ex.getMessage(),
+ containsString("Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755\""));
+ }
+
+ @AfterClass
+ public static void removeMiniDfsBasedStorage() throws Exception {
+ getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+ stopMiniDfsCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
new file mode 100644
index 0000000..14392c9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test queries involving direct impersonation and multilevel impersonation including join queries where each side is
+ * a nested view.
+ */
+public class TestImpersonationQueries extends BaseTestImpersonation {
+ private static final String MINIDFS_STORAGE_PLUGIN_NAME = "minidfs" + TestImpersonationQueries.class.getSimpleName();
+
+ private static final String[] org1Users = { "user0_1", "user1_1", "user2_1", "user3_1", "user4_1", "user5_1" };
+ private static final String[] org1Groups = { "group0_1", "group1_1", "group2_1", "group3_1", "group4_1", "group5_1" };
+ private static final String[] org2Users = { "user0_2", "user1_2", "user2_2", "user3_2", "user4_2", "user5_2" };
+ private static final String[] org2Groups = { "group0_2", "group1_2", "group2_2", "group3_2", "group4_2", "group5_2" };
+
+ static {
+ // "user0_1" belongs to "groups0_1". From "user1_1" onwards each user belongs to corresponding group and the group
+ // before it, i.e "user1_1" belongs to "group1_1" and "group0_1" and so on.
+ UserGroupInformation.createUserForTesting(org1Users[0], new String[] { org1Groups[0] });
+ for(int i=1; i<org1Users.length; i++) {
+ UserGroupInformation.createUserForTesting(org1Users[i], new String[] { org1Groups[i], org1Groups[i-1] });
+ }
+
+ UserGroupInformation.createUserForTesting(org2Users[0], new String[] { org2Groups[0] });
+ for(int i=1; i<org2Users.length; i++) {
+ UserGroupInformation.createUserForTesting(org2Users[i], new String[] { org2Groups[i], org2Groups[i-1] });
+ }
+ }
+
+ @BeforeClass
+ public static void addMiniDfsBasedStorageAndGenerateTestData() throws Exception {
+ startMiniDfsCluster(TestImpersonationQueries.class.getSimpleName());
+
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ final FileSystemConfig lfsPluginConfig = (FileSystemConfig) pluginRegistry.getPlugin("dfs").getConfig();
+
+ final FileSystemConfig miniDfsPluginConfig = new FileSystemConfig();
+ miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+ Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+
+ createTestWorkspaces(workspaces);
+
+ miniDfsPluginConfig.workspaces = workspaces;
+ miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
+ miniDfsPluginConfig.setEnabled(true);
+
+ pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+
+ // Create test tables/views
+
+ // Create copy of "lineitem" table in /user/user0_1 owned by user0_1:group0_1 with permissions 750. Only user0_1
+ // has access to data in created "lineitem" table.
+ createTestTable(org1Users[0], org1Groups[0], "lineitem");
+
+ // Create copy of "orders" table in /user/user0_2 owned by user0_2:group0_2 with permissions 750. Only user0_2
+ // has access to data in created "orders" table.
+ createTestTable(org2Users[0], org2Groups[0], "orders");
+
+ createNestedTestViewsOnLineItem();
+ createNestedTestViewsOnOrders();
+ }
+
+ private static String getUserHome(String user) {
+ return "/user/" + user;
+ }
+
+ // Return the user workspace for given user.
+ private static String getWSSchema(String user) {
+ return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+ }
+
+ private static void createTestWorkspaces(Map<String, WorkspaceConfig> workspaces) throws Exception {
+ // Create "/tmp" folder and set permissions to "777"
+ final FileSystem fs = dfsCluster.getFileSystem();
+ final Path tmpPath = new Path("/tmp");
+ fs.delete(tmpPath, true);
+ FileSystem.mkdirs(fs, tmpPath, new FsPermission((short)0777));
+
+ // create user directory (ex. "/user/user0_1", with ownership "user0_1:group0_1" and perms 755) for every user.
+ for(int i=0; i<org1Users.length; i++) {
+ final String user = org1Users[i];
+ final String group = org1Groups[i];
+ createAndAddWorkspace(fs, user, getUserHome(user), (short)0755, user, group, workspaces);
+ }
+
+ // create user directory (ex. "/user/user0_2", with ownership "user0_2:group0_2" and perms 755) for every user.
+ for(int i=0; i<org2Users.length; i++) {
+ final String user = org2Users[i];
+ final String group = org2Groups[i];
+ createAndAddWorkspace(fs, user, getUserHome(user), (short)0755, user, group, workspaces);
+ }
+ }
+
+ private static void createTestTable(String user, String group, String tableName) throws Exception {
+ updateClient(user);
+ test("USE " + getWSSchema(user));
+ test(String.format("CREATE TABLE %s as SELECT * FROM cp.`tpch/%s.parquet`;", tableName, tableName));
+
+ // Change the ownership and permissions manually. Currently there is no option to specify the default permissions
+ // and ownership for new tables.
+ final Path tablePath = new Path(getUserHome(user), tableName);
+ final FileSystem fs = dfsCluster.getFileSystem();
+
+ fs.setOwner(tablePath, user, group);
+ fs.setPermission(tablePath, new FsPermission((short)0750));
+ }
+
+ private static void createNestedTestViewsOnLineItem() throws Exception {
+ // Input table "lineitem"
+ // /user/user0_1 lineitem 750 user0_1:group0_1
+
+ // Create a view on top of lineitem table
+ // /user/user1_1 u1_lineitem 750 user1_1:group1_1
+ createView(org1Users[1], org1Groups[1], (short)0750, "u1_lineitem", getWSSchema(org1Users[0]), "lineitem");
+
+ // Create a view on top of u1_lineitem view
+ // /user/user2_1 u2_lineitem 750 user2_1:group2_1
+ createView(org1Users[2], org1Groups[2], (short)0750, "u2_lineitem", getWSSchema(org1Users[1]), "u1_lineitem");
+
+ // Create a view on top of u2_lineitem view
+ // /user/user2_1 u22_lineitem 750 user2_1:group2_1
+ createView(org1Users[2], org1Groups[2], (short)0750, "u22_lineitem", getWSSchema(org1Users[2]), "u2_lineitem");
+
+ // Create a view on top of u22_lineitem view
+ // /user/user3_1 u3_lineitem 750 user3_1:group3_1
+ createView(org1Users[3], org1Groups[3], (short)0750, "u3_lineitem", getWSSchema(org1Users[2]), "u22_lineitem");
+
+ // Create a view on top of u3_lineitem view
+ // /user/user4_1 u4_lineitem 755 user4_1:group4_1
+ createView(org1Users[4], org1Groups[4], (short)0755, "u4_lineitem", getWSSchema(org1Users[3]), "u3_lineitem");
+ }
+
+ private static void createNestedTestViewsOnOrders() throws Exception {
+ // Input table "orders"
+ // /user/user0_2 orders 750 user0_2:group0_2
+
+ // Create a view on top of orders table
+ // /user/user1_2 u1_orders 750 user1_2:group1_2
+ createView(org2Users[1], org2Groups[1], (short)0750, "u1_orders", getWSSchema(org2Users[0]), "orders");
+
+ // Create a view on top of u1_orders view
+ // /user/user2_2 u2_orders 750 user2_2:group2_2
+ createView(org2Users[2], org2Groups[2], (short)0750, "u2_orders", getWSSchema(org2Users[1]), "u1_orders");
+
+ // Create a view on top of u2_orders view
+ // /user/user2_2 u22_orders 750 user2_2:group2_2
+ createView(org2Users[2], org2Groups[2], (short)0750, "u22_orders", getWSSchema(org2Users[2]), "u2_orders");
+
+ // Create a view on top of u22_orders view (permissions of this view (755) are different from permissions of the
+ // corresponding view in "lineitem" nested views to have a join query of "lineitem" and "orders" nested views
+ // without exceeding the maximum number of allowed user hops in chained impersonation.
+ // /user/user3_2 u3_orders 750 user3_2:group3_2
+ createView(org2Users[3], org2Groups[3], (short)0755, "u3_orders", getWSSchema(org2Users[2]), "u22_orders");
+
+ // Create a view on top of u3_orders view
+ // /user/user4_2 u4_orders 755 user4_2:group4_2
+ createView(org2Users[4], org2Groups[4], (short)0755, "u4_orders", getWSSchema(org2Users[3]), "u3_orders");
+ }
+
+ private static void createView(final String viewOwner, final String viewGroup, final short viewPerms,
+ final String newViewName, final String fromSourceSchema, final String fromSourceTableName) throws Exception {
+ updateClient(viewOwner);
+ test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, viewPerms));
+ test(String.format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s;",
+ getWSSchema(viewOwner), newViewName, fromSourceSchema, fromSourceTableName));
+
+ // Verify the view file created has the expected permissions and ownership
+ Path viewFilePath = new Path(getUserHome(viewOwner), newViewName + DotDrillType.VIEW.getEnding());
+ FileStatus status = dfsCluster.getFileSystem().getFileStatus(viewFilePath);
+ assertEquals(viewGroup, status.getGroup());
+ assertEquals(viewOwner, status.getOwner());
+ assertEquals(viewPerms, status.getPermission().toShort());
+ }
+
+ @Test
+ public void testDirectImpersonation_HasUserReadPermissions() throws Exception {
+ // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Try to read the table as "user0_1". We
+ // shouldn't expect any errors.
+ updateClient(org1Users[0]);
+ test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+ }
+
+ @Test
+ public void testDirectImpersonation_HasGroupReadPermissions() throws Exception {
+ // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Try to read the table as "user1_1". We
+ // shouldn't expect any errors as "user1_1" is part of the "group0_1"
+ updateClient(org1Users[1]);
+ test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+ }
+
+ @Test
+ public void testDirectImpersonation_NoReadPermissions() throws Exception {
+ UserRemoteException ex = null;
+ try {
+ // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Now try to read the table as "user2_1". We
+ // should expect a permission denied error as "user2_1" is not part of the "group0_1"
+ updateClient(org1Users[2]);
+ test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+ } catch(UserRemoteException e) {
+ ex = e;
+ }
+
+ assertNotNull("UserRemoteException is expected", ex);
+ assertThat(ex.getMessage(), containsString("PERMISSION ERROR: " +
+ "Not authorized to read table [lineitem] in schema [minidfsTestImpersonationQueries.user0_1]"));
+ }
+
+
+ @Test
+ public void testMultiLevelImpersonationEqualToMaxUserHops() throws Exception {
+ updateClient(org1Users[4]);
+ test(String.format("SELECT * from %s.u4_lineitem LIMIT 1;", getWSSchema(org1Users[4])));
+ }
+
+ @Test
+ public void testMultiLevelImpersonationExceedsMaxUserHops() throws Exception {
+ UserRemoteException ex = null;
+
+ try {
+ updateClient(org1Users[5]);
+ test(String.format("SELECT * from %s.u4_lineitem LIMIT 1;", getWSSchema(org1Users[4])));
+ } catch(UserRemoteException e) {
+ ex = e;
+ }
+
+ assertNotNull("UserRemoteException is expected", ex);
+ assertThat(ex.getMessage(),
+ containsString("Cannot issue token for view expansion as issuing the token exceeds the maximum allowed number " +
+ "of user hops (3) in chained impersonation"));
+ }
+
+ @Test
+ public void testMultiLevelImpersonationJoinEachSideReachesMaxUserHops() throws Exception {
+ updateClient(org1Users[4]);
+ test(String.format("SELECT * from %s.u4_lineitem l JOIN %s.u3_orders o ON l.l_orderkey = o.o_orderkey LIMIT 1;",
+ getWSSchema(org1Users[4]), getWSSchema(org2Users[3])));
+ }
+
+ @Test
+ public void testMultiLevelImpersonationJoinOneSideExceedsMaxUserHops() throws Exception {
+ UserRemoteException ex = null;
+
+ try {
+ updateClient(org1Users[4]);
+ test(String.format("SELECT * from %s.u4_lineitem l JOIN %s.u4_orders o ON l.l_orderkey = o.o_orderkey LIMIT 1;",
+ getWSSchema(org1Users[4]), getWSSchema(org2Users[4])));
+ } catch(UserRemoteException e) {
+ ex = e;
+ }
+
+ assertNotNull("UserRemoteException is expected", ex);
+ assertThat(ex.getMessage(),
+ containsString("Cannot issue token for view expansion as issuing the token exceeds the maximum allowed number " +
+ "of user hops (3) in chained impersonation"));
+ }
+
+ @AfterClass
+ public static void removeMiniDfsBasedStorage() throws Exception {
+ getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+ stopMiniDfsCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 91707fa..558646e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
<target.gen.source.path>${project.basedir}/target/generated-sources</target.gen.source.path>
<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
<dep.junit.version>4.11</dep.junit.version>
- <dep.slf4j.version>1.7.5</dep.slf4j.version>
+ <dep.slf4j.version>1.7.6</dep.slf4j.version>
<parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
</properties>
@@ -705,6 +705,99 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.4.1</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>mockito-all</artifactId>
+ <groupId>org.mockito</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-logging-api</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>core</artifactId>
+ <groupId>org.eclipse.jdt</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.4.1</version>
<exclusions>
@@ -999,6 +1092,32 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.4.1</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>
[3/4] drill git commit: DRILL-2413: FileSystemPlugin refactoring:
avoid sharing DrillFileSystem across schemas
Posted by ve...@apache.org.
DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/117b7497
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/117b7497
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/117b7497
Branch: refs/heads/master
Commit: 117b749744d1775816fc5f9591dced3aae551352
Parents: fbb405b
Author: vkorukanti <ve...@gmail.com>
Authored: Tue Mar 10 10:19:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseSchemaFactory.java | 2 +-
.../exec/store/hbase/HBaseStoragePlugin.java | 2 +-
.../exec/store/hive/HiveStoragePlugin.java | 2 +-
.../store/hive/schema/HiveSchemaFactory.java | 3 +-
.../exec/store/mongo/MongoStoragePlugin.java | 2 +-
.../store/mongo/schema/MongoSchemaFactory.java | 4 +-
.../apache/drill/exec/ops/FragmentContext.java | 43 ++++++-
.../org/apache/drill/exec/ops/QueryContext.java | 15 ++-
.../apache/drill/exec/store/SchemaFactory.java | 4 +-
.../drill/exec/store/StoragePluginRegistry.java | 2 +-
.../drill/exec/store/avro/AvroFormatPlugin.java | 9 +-
.../exec/store/dfs/BasicFormatMatcher.java | 25 ++--
.../drill/exec/store/dfs/DrillFileSystem.java | 6 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 34 +++---
.../exec/store/dfs/FileSystemSchemaFactory.java | 12 +-
.../drill/exec/store/dfs/FormatCreator.java | 17 +--
.../drill/exec/store/dfs/FormatMatcher.java | 2 +-
.../drill/exec/store/dfs/FormatPlugin.java | 3 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 115 ++++++++++---------
.../exec/store/dfs/easy/EasyFormatPlugin.java | 27 +++--
.../exec/store/dfs/easy/EasyGroupScan.java | 8 +-
.../exec/store/easy/json/JSONFormatPlugin.java | 15 ++-
.../exec/store/easy/text/TextFormatPlugin.java | 18 ++-
.../store/ischema/InfoSchemaStoragePlugin.java | 2 +-
.../exec/store/mock/MockStorageEngine.java | 2 +-
.../exec/store/parquet/ParquetFormatPlugin.java | 76 ++++++------
.../exec/store/parquet/ParquetGroupScan.java | 17 +--
.../store/parquet/ParquetScanBatchCreator.java | 8 +-
.../drill/exec/store/sys/SystemTablePlugin.java | 2 +-
.../exec/store/text/DrillTextRecordReader.java | 6 +-
.../exec/work/batch/ControlHandlerImpl.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 11 +-
.../work/fragment/NonRootFragmentManager.java | 2 +-
.../exec/store/dfs/TestDrillFileSystem.java | 2 +-
34 files changed, 279 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7a0a64b..1c407e1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HBaseSchema schema = new HBaseSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index c10b0ab..948d462 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -66,7 +66,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index f4baf3b..91e7a92 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -82,7 +82,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 0e16e6f..587e90d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.hive.schema;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -186,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HiveSchema schema = new HiveSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index e46d8ec..dfad5ef 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -63,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 3c70638..f650ccc 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mongo.schema;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,7 +37,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.store.mongo.MongoCnxnManager;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
MongoSchema schema = new MongoSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9400355..7dfd0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
@@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
- private final UserClientConnection connection; // is null if attached to non-root fragment
+ private final UserClientConnection connection; // is null if this context is for non-root fragment
+ private final QueryContext queryContext; // is null if this context is for non-root fragment
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
@@ -87,10 +87,34 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
private final AccountingUserConnection accountingUserConnection;
+ /**
+ * Create a FragmentContext instance for non-root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+ final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, null, funcRegistry);
+ }
+
+ /**
+ * Create a FragmentContext instance for root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param queryContext QueryContext.
+ * @param connection UserClientConnection.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
+ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
this.context = dbContext;
+ this.queryContext = queryContext;
this.connection = connection;
this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
this.fragment = fragment;
@@ -128,6 +152,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
bufferManager = new BufferManager(this.allocator, this);
}
+ /**
+ * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+ * the long list of test files.
+ */
+ public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+ FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, connection, funcRegistry);
+ }
+
public OptionManager getOptions() {
return fragmentOptions;
}
@@ -162,15 +195,13 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
}
public SchemaPlus getRootSchema() {
- if (connection == null) {
+ if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment."));
return null;
}
- final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
- context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
- return root;
+ return queryContext.getRootSchema();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2dcac25..cd5c054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.ops;
+import java.io.IOException;
import java.util.Collection;
import io.netty.buffer.DrillBuf;
@@ -46,7 +47,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
public class QueryContext implements AutoCloseable, UdfUtilities {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private final DrillbitContext drillbitContext;
private final UserSession session;
@@ -113,9 +114,15 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
}
public SchemaPlus getRootSchema() {
- final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
- drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
- return rootSchema;
+ try {
+ final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+ drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+ return rootSchema;
+ } catch(IOException e) {
+ final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
+ logger.error(errMsg, e);
+ throw new DrillRuntimeException(errMsg, e);
+ }
}
public OptionManager getOptions() {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index feadabd..14d2fab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -21,8 +21,10 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.exec.rpc.user.UserSession;
+import java.io.IOException;
+
public interface SchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
- public void registerSchemas(UserSession session, SchemaPlus parent);
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 5d0eed6..cb9ee0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -301,7 +301,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
public class DrillSchemaFactory implements SchemaFactory {
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
Stopwatch watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 2f487d6..30c45fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;
@@ -40,13 +41,13 @@ import java.util.List;
*/
public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
- public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+ public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storagePluginConfig) {
- this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+ this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
}
- public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+ public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 9756f3c..3768aea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -36,34 +37,32 @@ import com.google.common.collect.Range;
public class BasicFormatMatcher extends FormatMatcher{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
- private final List<Pattern> patterns;
- private final MagicStringMatcher matcher;
- protected final DrillFileSystem fs;
protected final FormatPlugin plugin;
protected final boolean compressible;
protected final CompressionCodecFactory codecFactory;
- public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
+ private final List<Pattern> patterns;
+ private final MagicStringMatcher matcher;
+
+ public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
super();
this.patterns = ImmutableList.copyOf(patterns);
this.matcher = new MagicStringMatcher(magicStrings);
- this.fs = fs;
this.plugin = plugin;
this.compressible = false;
this.codecFactory = null;
}
- public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) {
+ public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
List<Pattern> patterns = Lists.newArrayList();
for (String extension : extensions) {
patterns.add(Pattern.compile(".*\\." + extension));
}
this.patterns = patterns;
this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
- this.fs = fs;
this.plugin = plugin;
this.compressible = compressible;
- this.codecFactory = new CompressionCodecFactory(fs.getConf());
+ this.codecFactory = new CompressionCodecFactory(fsConf);
}
@Override
@@ -72,8 +71,8 @@ public class BasicFormatMatcher extends FormatMatcher{
}
@Override
- public FormatSelection isReadable(FileSelection selection) throws IOException {
- if (isReadable(selection.getFirstPath(fs))) {
+ public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
+ if (isReadable(fs, selection.getFirstPath(fs))) {
if (plugin.getName() != null) {
NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
namedConfig.name = plugin.getName();
@@ -85,7 +84,7 @@ public class BasicFormatMatcher extends FormatMatcher{
return null;
}
- protected final boolean isReadable(FileStatus status) throws IOException {
+ protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException {
CompressionCodec codec = null;
if (compressible) {
codec = codecFactory.getCodec(status.getPath());
@@ -103,7 +102,7 @@ public class BasicFormatMatcher extends FormatMatcher{
}
}
- if (matcher.matches(status)) {
+ if (matcher.matches(fs, status)) {
return true;
}
return false;
@@ -128,7 +127,7 @@ public class BasicFormatMatcher extends FormatMatcher{
}
}
- public boolean matches(FileStatus status) throws IOException{
+ public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
if (ranges.isEmpty()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 2683cca..f8afe3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -96,11 +96,11 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
private final OperatorStats operatorStats;
public DrillFileSystem(Configuration fsConf) throws IOException {
- this(FileSystem.get(fsConf), null);
+ this(fsConf, null);
}
- public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
- this.underlyingFs = fs;
+ public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
+ this.underlyingFs = FileSystem.get(fsConf);
this.operatorStats = operatorStats;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index c5ca41b..775b402 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import static org.apache.drill.exec.store.dfs.FileSystemSchemaFactory.DEFAULT_WS_NAME;
+
/**
* A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
* LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem.
@@ -51,26 +53,26 @@ import com.google.common.collect.Maps;
* references to the FileSystem configuration and path management.
*/
public class FileSystemPlugin extends AbstractStoragePlugin{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
private final FileSystemSchemaFactory schemaFactory;
- private Map<String, FormatPlugin> formatPluginsByName;
- private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
- private FileSystemConfig config;
- private DrillbitContext context;
- private final DrillFileSystem fs;
+ private final Map<String, FormatPlugin> formatPluginsByName;
+ private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
+ private final FileSystemConfig config;
+ private final DrillbitContext context;
+ private final Configuration fsConf;
public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{
try {
this.config = config;
this.context = context;
- Configuration fsConf = new Configuration();
+ fsConf = new Configuration();
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
- fs = new DrillFileSystem(fsConf);
- formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
+
+ formatPluginsByName = FormatCreator.getFormatPlugins(context, fsConf, config);
List<FormatMatcher> matchers = Lists.newArrayList();
formatPluginsByConfig = Maps.newHashMap();
for (FormatPlugin p : formatPluginsByName.values()) {
@@ -78,17 +80,17 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
formatPluginsByConfig.put(p.getConfig(), p);
}
- boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
+ final boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
List<WorkspaceSchemaFactory> factories = Lists.newArrayList();
if (!noWorkspace) {
for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) {
- factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, fs, space.getValue(), matchers));
+ factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, space.getValue(), matchers));
}
}
// if the "default" workspace is not given add one.
- if (noWorkspace || !config.workspaces.containsKey("default")) {
- factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers));
+ if (noWorkspace || !config.workspaces.containsKey(DEFAULT_WS_NAME)) {
+ factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers));
}
this.schemaFactory = new FileSystemSchemaFactory(name, factories);
@@ -123,7 +125,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
@@ -151,5 +153,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
return setBuilder.build();
}
-
+ public Configuration getFsConf() {
+ return fsConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 44132d0..e11712e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.fs.Path;
* This is the top level schema that responds to root level path requests. Also supports
*/
public class FileSystemSchemaFactory implements SchemaFactory{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+
+ public static final String DEFAULT_WS_NAME = "default";
private List<WorkspaceSchemaFactory> factories;
private String schemaName;
- private final String defaultSchemaName = "default";
-
public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
super();
@@ -58,7 +58,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
FileSystemSchema schema = new FileSystemSchema(schemaName, session);
SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
schema.setPlus(plusOfThis);
@@ -69,14 +69,14 @@ public class FileSystemSchemaFactory implements SchemaFactory{
private final WorkspaceSchema defaultSchema;
private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
- public FileSystemSchema(String name, UserSession session) {
+ public FileSystemSchema(String name, UserSession session) throws IOException {
super(ImmutableList.<String>of(), name);
for(WorkspaceSchemaFactory f : factories){
WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
schemaMap.put(s.getName(), s);
}
- defaultSchema = schemaMap.get(defaultSchemaName);
+ defaultSchema = schemaMap.get(DEFAULT_WS_NAME);
}
void setPlus(SchemaPlus plusOfThis){
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index c164ed5..d2a903b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -31,20 +31,23 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.DrillbitContext;
import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
public class FormatCreator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
- static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
- static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
+ private static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+ Configuration.class, StoragePluginConfig.class, FormatPluginConfig.class);
+ private static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+ Configuration.class, StoragePluginConfig.class);
- static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) {
+ static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, Configuration fsConf,
+ FileSystemConfig storageConfig) {
final DrillConfig config = context.getConfig();
Map<String, FormatPlugin> plugins = Maps.newHashMap();
Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-
if (storageConfig.formats == null || storageConfig.formats.isEmpty()) {
for (Class<? extends FormatPlugin> pluginClass: pluginClasses) {
@@ -53,7 +56,7 @@ public class FormatCreator {
if (!DEFAULT_BASED.check(c)) {
continue;
}
- FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig);
+ FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fsConf, storageConfig);
plugins.put(plugin.getName(), plugin);
} catch (Exception e) {
logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
@@ -84,7 +87,7 @@ public class FormatCreator {
continue;
}
try {
- plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue()));
+ plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fsConf, storageConfig, e.getValue()));
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
index 92e3d0a..0b8c7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
@@ -23,6 +23,6 @@ public abstract class FormatMatcher {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);
public abstract boolean supportDirectoryReads();
- public abstract FormatSelection isReadable(FileSelection selection) throws IOException;
+ public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException;
public abstract FormatPlugin getFormatPlugin();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 58d5b42..955dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.hadoop.conf.Configuration;
/**
* Similar to a storage engine but built specifically to work within a FileSystem context.
@@ -51,7 +52,7 @@ public interface FormatPlugin {
public FormatPluginConfig getConfig();
public StoragePluginConfig getStorageConfig();
- public DrillFileSystem getFileSystem();
+ public Configuration getFsConf();
public DrillbitContext getContext();
public String getName();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 45e9129..a536350 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -52,14 +53,14 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+public class WorkspaceSchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
private final List<FormatMatcher> fileMatchers;
private final List<FormatMatcher> dirMatchers;
private final WorkspaceConfig config;
- private final DrillFileSystem fs;
+ private final Configuration fsConf;
private final DrillConfig drillConfig;
private final String storageEngineName;
private final String schemaName;
@@ -67,9 +68,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
private final ObjectMapper mapper;
public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, String schemaName,
- String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config,
- List<FormatMatcher> formatMatchers) throws ExecutionSetupException, IOException {
- this.fs = fileSystem;
+ String storageEngineName, WorkspaceConfig config, List<FormatMatcher> formatMatchers)
+ throws ExecutionSetupException, IOException {
+ this.fsConf = plugin.getFsConf();
this.plugin = plugin;
this.drillConfig = drillConfig;
this.config = config;
@@ -95,7 +96,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
defaultInputFormat, storageEngineName, schemaName);
throw new ExecutionSetupException(message);
}
- final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs,
+ final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
fileMatchers.add(fallbackMatcher);
}
@@ -105,54 +106,21 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
return DotDrillType.VIEW.getPath(config.getLocation(), name);
}
- public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) {
+ public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
return new WorkspaceSchema(parentSchemaPath, schemaName, session);
}
- @Override
- public DrillTable create(String key) {
- try {
+ public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+ private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
+ private final UserSession session;
+ private final DrillFileSystem fs;
- FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
- if (fileSelection == null) {
- return null;
- }
-
- if (fileSelection.containsDirectories(fs)) {
- for (FormatMatcher m : dirMatchers) {
- try {
- Object selection = m.isReadable(fileSelection);
- if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
- }
- } catch (IOException e) {
- logger.debug("File read failed.", e);
- }
- }
- fileSelection = fileSelection.minusDirectories(fs);
- }
-
- for (FormatMatcher m : fileMatchers) {
- Object selection = m.isReadable(fileSelection);
- if (selection != null) {
- return new DynamicDrillTable(plugin, storageEngineName, selection);
- }
- }
- return null;
-
- } catch (IOException e) {
- logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+ public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+ super(parentSchemaPath, wsName);
+ this.session = session;
+ this.fs = new DrillFileSystem(fsConf);
}
- return null;
- }
-
- @Override
- public void destroy(DrillTable value) {
- }
-
- public class WorkspaceSchema extends AbstractSchema {
-
public boolean createView(View view) throws Exception {
Path viewPath = getViewPath(view.getName());
boolean replaced = fs.exists(viewPath);
@@ -186,15 +154,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
fs.delete(getViewPath(viewName), false);
}
- private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(WorkspaceSchemaFactory.this);
-
- private UserSession session;
-
- public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
- super(parentSchemaPath, name);
- this.session = session;
- }
-
private Set<String> getViews() {
Set<String> viewSet = Sets.newHashSet();
// Look for files with ".view.drill" extension.
@@ -282,5 +241,47 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
public String getTypeName() {
return FileSystemConfig.NAME;
}
+
+ @Override
+ public DrillTable create(String key) {
+ try {
+
+ FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
+ if (fileSelection == null) {
+ return null;
+ }
+
+ if (fileSelection.containsDirectories(fs)) {
+ for (FormatMatcher m : dirMatchers) {
+ try {
+ Object selection = m.isReadable(fs, fileSelection);
+ if (selection != null) {
+ return new DynamicDrillTable(plugin, storageEngineName, selection);
+ }
+ } catch (IOException e) {
+ logger.debug("File read failed.", e);
+ }
+ }
+ fileSelection = fileSelection.minusDirectories(fs);
+ }
+
+ for (FormatMatcher m : fileMatchers) {
+ Object selection = m.isReadable(fs, fileSelection);
+ if (selection != null) {
+ return new DynamicDrillTable(plugin, storageEngineName, selection);
+ }
+ }
+ return null;
+
+ } catch (IOException e) {
+ logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void destroy(DrillTable value) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 6e1e0cc..5c7152a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -54,38 +54,39 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
private final BasicFormatMatcher matcher;
private final DrillbitContext context;
private final boolean readable;
private final boolean writable;
private final boolean blockSplittable;
- private final DrillFileSystem fs;
+ private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final FormatPluginConfig formatConfig;
private final String name;
protected final CompressionCodecFactory codecFactory;
private final boolean compressible;
- protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
- T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
+ protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName){
+ this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
this.readable = readable;
this.writable = writable;
this.context = context;
this.blockSplittable = blockSplittable;
this.compressible = compressible;
- this.fs = fs;
+ this.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
this.name = name == null ? defaultName : name;
- this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
+ this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
}
@Override
- public DrillFileSystem getFileSystem() {
- return fs;
+ public Configuration getFsConf() {
+ return fsConf;
}
@Override
@@ -152,7 +153,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
int numParts = 0;
OperatorContext oContext = new OperatorContext(scan, context,
false /* ScanBatch is not subject to fragment memory limit */);
- DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
+ DrillFileSystem dfs;
+ try {
+ dfs = new DrillFileSystem(fsConf, oContext.getStats());
+ } catch (IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+ }
+
for(FileWork work : scan.getWorkUnits()){
readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
if (scan.getSelectionRoot() != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 54cad56..7c70df3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
@@ -51,7 +52,7 @@ import com.google.common.collect.Lists;
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractFileGroupScan{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
@@ -109,9 +110,10 @@ public class EasyGroupScan extends AbstractFileGroupScan{
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
+ final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
this.selection = selection;
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+ BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
+ this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
this.maxWidth = chunks.size();
this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 2e65466..32e34b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
@@ -38,22 +39,26 @@ import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
private static final boolean IS_COMPRESSIBLE = true;
+ private static final String DEFAULT_NAME = "json";
+ private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
- public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
- this(name, context, fs, storageConfig, new JSONFormatConfig());
+ public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ this(name, context, fsConf, storageConfig, new JSONFormatConfig());
}
- public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json");
+ public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ JSONFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE,
+ DEFAULT_EXTS, DEFAULT_NAME);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bf46395..237589c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.store.easy.text;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -42,6 +42,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -51,13 +52,17 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
+ private final static String DEFAULT_NAME = "text";
- public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
- super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList<String>(), "text");
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
+ Collections.<String>emptyList(), DEFAULT_NAME);
}
- public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text");
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ TextFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
+ formatPluginConfig.getExtensions(), DEFAULT_NAME);
}
@@ -67,7 +72,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
Path path = dfs.makeQualified(new Path(fileWork.getPath()));
FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
- return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+ return new DrillTextRecordReader(split, getFsConf(), context,
+ ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 77c6b9a..a1249e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -69,7 +69,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
ISchema s = new ISchema(parent, this);
parent.add(s.getName(), s);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 51b2208..96226a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -55,7 +55,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e204a2c..9c83ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -62,37 +62,44 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class ParquetFormatPlugin implements FormatPlugin{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
- private final DrillbitContext context;
public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private CodecFactoryExposer codecFactoryExposer;
- private final DrillFileSystem fs;
+
+ private static final String DEFAULT_NAME = "parquet";
+
+ private static final List<Pattern> PATTERNS = Lists.newArrayList(
+ Pattern.compile(".*\\.parquet$"),
+ Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
+ private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+
+ private final DrillbitContext context;
+ private final CodecFactoryExposer codecFactoryExposer;
+ private final Configuration fsConf;
private final ParquetFormatMatcher formatMatcher;
private final ParquetFormatConfig config;
private final StoragePluginConfig storageConfig;
private final String name;
- public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
- this(name, context, fs, storageConfig, new ParquetFormatConfig());
+ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig){
+ this(name, context, fsConf, storageConfig, new ParquetFormatConfig());
}
- public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
this.context = context;
- this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
+ this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
this.config = formatConfig;
- this.formatMatcher = new ParquetFormatMatcher(this, fs);
+ this.formatMatcher = new ParquetFormatMatcher(this);
this.storageConfig = storageConfig;
- this.fs = fs;
- this.name = name == null ? "parquet" : name;
- }
-
- Configuration getHadoopConfig() {
- return fs.getConf();
+ this.fsConf = fsConf;
+ this.name = name == null ? DEFAULT_NAME : name;
}
- public DrillFileSystem getFileSystem() {
- return fs;
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
}
@Override
@@ -155,12 +162,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
@Override
public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+ return getGroupScan(selection, null);
}
@Override
public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+ final DrillFileSystem dfs = new DrillFileSystem(fsConf);
+ return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
}
@Override
@@ -190,20 +198,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
private static class ParquetFormatMatcher extends BasicFormatMatcher{
- private final DrillFileSystem fs;
-
- public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
- super(plugin, fs, //
- Lists.newArrayList( //
- Pattern.compile(".*\\.parquet$"), //
- Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
- //
- ),
- Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
-
- );
- this.fs = fs;
-
+ public ParquetFormatMatcher(ParquetFormatPlugin plugin) {
+ super(plugin, PATTERNS, MAGIC_STRINGS);
}
@Override
@@ -212,17 +208,17 @@ public class ParquetFormatPlugin implements FormatPlugin{
}
@Override
- public FormatSelection isReadable(FileSelection selection) throws IOException {
+ public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
// TODO: we only check the first file for directory reading. This is because
if(selection.containsDirectories(fs)){
- if(isDirReadable(selection.getFirstPath(fs))){
+ if(isDirReadable(fs, selection.getFirstPath(fs))){
return new FormatSelection(plugin.getConfig(), selection);
}
}
- return super.isReadable(selection);
+ return super.isReadable(fs, selection);
}
- boolean isDirReadable(FileStatus dir) {
+ boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
try {
if (fs.exists(p)) {
@@ -235,16 +231,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
if (files.length == 0) {
return false;
}
- return super.isReadable(files[0]);
+ return super.isReadable(fs, files[0]);
}
} catch (IOException e) {
logger.info("Failure while attempting to check for Parquet metadata file.", e);
return false;
}
}
-
-
-
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index acac61f..a59f2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
@@ -76,11 +77,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
- static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
- static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
- static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
-
- final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
private ListMultimap<Integer, RowGroupInfo> mappings;
private List<RowGroupInfo> rowGroupInfos;
@@ -135,7 +131,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
Preconditions.checkNotNull(formatConfig);
this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(formatPlugin);
- this.fs = formatPlugin.getFileSystem();
+ this.fs = new DrillFileSystem(formatPlugin.getFsConf());
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.selectionRoot = selectionRoot;
@@ -155,7 +151,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.formatPlugin = formatPlugin;
this.columns = columns;
this.formatConfig = formatPlugin.getConfig();
- this.fs = formatPlugin.getFileSystem();
+ this.fs = new DrillFileSystem(formatPlugin.getFsConf());
this.entries = Lists.newArrayList();
for (FileStatus file : files) {
@@ -205,7 +201,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
ColumnChunkMetaData columnChunkMetaData;
- List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getHadoopConfig(), statuses, 16);
+ List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
for (Footer footer : footers) {
int index = 0;
ParquetMetadata metadata = footer.getParquetMetadata();
@@ -260,11 +256,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
}
- @JsonIgnore
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
@Override
public void modifyFileSelection(FileSelection selection) {
entries.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index c1f815e..b1c725c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.column.ColumnDescriptor;
@@ -95,7 +94,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
rowGroupScan.setOperatorId(id);
}
- DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
+ DrillFileSystem fs;
+ try {
+ fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+ } catch(IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+ }
Configuration conf = fs.getConf();
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index b92f98c..4a3b97b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -67,7 +67,7 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
}
@Override
- public void registerSchemas(UserSession session, SchemaPlus parent) {
+ public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
parent.add(schema.getName(), schema);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 1ad053d..3368412 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.vector.RepeatedVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
@@ -69,7 +70,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
private FileSplit split;
private long totalRecordsRead;
- public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
+ public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
+ List<SchemaPath> columns) {
this.fragmentContext = context;
this.delimiter = (byte) delimiter;
this.split = split;
@@ -95,7 +97,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
TextInputFormat inputFormat = new TextInputFormat();
- JobConf job = new JobConf();
+ JobConf job = new JobConf(fsConf);
job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
job.setInputFormat(inputFormat.getClass());
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3fb0775..b6c6852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -124,7 +124,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
try {
// we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
if (fragment.getLeafFragment()) {
- final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment,
drillbitContext.getFunctionImplementationRegistry());
final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94ffba..cb2753c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -348,7 +348,7 @@ public class Foreman implements Runnable {
injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
// set up the root fragment first so we'll have incoming buffers available.
- setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
+ setupRootFragment(rootPlanFragment, work.getRootOperator());
setupNonRootFragments(planFragments);
drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
@@ -791,15 +791,14 @@ public class Foreman implements Runnable {
* Set up the root fragment (which will run locally), and submit it for execution.
*
* @param rootFragment
- * @param rootClient
* @param rootOperator
* @throws ExecutionSetupException
*/
- private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
- final FragmentRoot rootOperator) throws ExecutionSetupException {
+ private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
+ throws ExecutionSetupException {
@SuppressWarnings("resource")
- final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
- drillbitContext.getFunctionImplementationRegistry());
+ final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
+ initiatingClient, drillbitContext.getFunctionImplementationRegistry());
@SuppressWarnings("resource")
final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
rootContext.setBuffers(buffers);
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index a5b928b..f526fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -56,7 +56,7 @@ public class NonRootFragmentManager implements FragmentManager {
try {
this.fragment = fragment;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
+ this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(root, this.context);
final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
fragment.getForeman()));
http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index ab6639e..550f56f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -74,7 +74,7 @@ public class TestDrillFileSystem {
stats.startProcessing();
try {
- dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+ dfs = new DrillFileSystem(conf, stats);
is = dfs.open(new Path(tempFilePath));
byte[] buf = new byte[8000];