You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/22 00:21:36 UTC

[1/4] drill git commit: DRILL-2514: Add support for impersonation in FileSystem storage plugin.

Repository: drill
Updated Branches:
  refs/heads/master fbb405bdb -> 2a484251b


http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index b032fce..58c8622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -49,12 +49,12 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
-    return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS);
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
new file mode 100644
index 0000000..0297945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.server.options.OptionValue;
+
+/**
+ * Contains information needed by {@link org.apache.drill.exec.store.AbstractSchema} implementations.
+ */
+public class SchemaConfig {
+  private final String userName;
+  private final QueryContext queryContext;
+  private final boolean ignoreAuthErrors;
+
+  private SchemaConfig(final String userName, final QueryContext queryContext, final boolean ignoreAuthErrors) {
+    this.userName = userName;
+    this.queryContext = queryContext;
+    this.ignoreAuthErrors = ignoreAuthErrors;
+  }
+
+  public static Builder newBuilder(final String userName, final QueryContext queryContext) {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(userName), "A valid userName is expected");
+    Preconditions.checkNotNull(queryContext, "Non-null QueryContext is expected");
+    return new Builder(userName, queryContext);
+  }
+
+  public static class Builder {
+    final String userName;
+    final QueryContext queryContext;
+    boolean ignoreAuthErrors;
+
+    private Builder(final String userName, final QueryContext queryContext) {
+      this.userName = userName;
+      this.queryContext = queryContext;
+    }
+
+    public Builder setIgnoreAuthErrors(boolean ignoreAuthErrors) {
+      this.ignoreAuthErrors = ignoreAuthErrors;
+      return this;
+    }
+
+    public SchemaConfig build() {
+      return new SchemaConfig(userName, queryContext, ignoreAuthErrors);
+    }
+  }
+
+  public QueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  /**
+   * @return User whom to impersonate as while {@link net.hydromatic.optiq.SchemaPlus} instances
+   * interact with the underlying storage.
+   */
+  public String getUserName() {
+    return userName;
+  }
+
+  /**
+   * @return Should ignore if authorization errors are reported while {@link net.hydromatic.optiq.SchemaPlus}
+   * instances interact with the underlying storage.
+   */
+  public boolean getIgnoreAuthErrors() {
+    return ignoreAuthErrors;
+  }
+
+  public OptionValue getOption(String optionKey) {
+    return queryContext.getOptions().getOption(optionKey);
+  }
+
+  public ViewExpansionContext getViewExpansionContext() {
+    return queryContext.getViewExpansionContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index 14d2fab..e2dc613 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -19,12 +19,22 @@ package org.apache.drill.exec.store;
 
 import net.hydromatic.optiq.SchemaPlus;
 
-import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.ops.QueryContext;
 
 import java.io.IOException;
 
+/**
+ * StoragePlugins implements this interface to register the schemas they provide.
+ */
 public interface SchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
+  /**
+   * Register the schemas provided by this SchemaFactory implementation under the given parent schema.
+   *
+   * @param schemaConfig Configuration for schema objects.
+   * @param parent Reference to parent schema.
+   * @throws IOException
+   */
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index ef5978c..b60c16f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -36,26 +36,24 @@ public interface StoragePlugin extends SchemaFactory {
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param selection
-   *          The configured storage engine specific selection.
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
    * @return
    * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection)
-      throws IOException;
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param selection
-   *          The configured storage engine specific selection.
-   * @param columns
-   *          (optional) The list of column names to scan from the data source.
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data source.
    * @return
    * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection,
-      List<SchemaPath> columns) throws IOException;
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException;
 
   public StoragePluginConfig getConfig();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index cb9ee0f..bda4cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -42,6 +42,8 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -301,7 +303,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   public class DrillSchemaFactory implements SchemaFactory {
 
     @Override
-    public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+    public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
       Stopwatch watch = new Stopwatch();
       watch.start();
 
@@ -325,7 +327,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
 
         // finally register schemas with the refreshed plugins
         for (StoragePlugin plugin : plugins.values()) {
-          plugin.registerSchemas(session, parent);
+          plugin.registerSchemas(schemaConfig, parent);
         }
       } catch (ExecutionSetupException e) {
         throw new DrillRuntimeException("Failure while updating storage plugins", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 775b402..93fb0a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -29,12 +29,13 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -110,7 +111,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class);
     FormatPlugin plugin;
     if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) {
@@ -121,12 +123,12 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
     if (plugin == null) {
       throw new IOException(String.format("Failure getting requested format plugin named '%s'.  It was not one of the format plugins registered.", formatSelection.getFormat()));
     }
-    return plugin.getGroupScan(formatSelection.getSelection(), columns);
+    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns);
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   public FormatPlugin getFormatPlugin(String name) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index e11712e..30d8d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -27,10 +27,11 @@ import net.hydromatic.optiq.Function;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
@@ -58,8 +59,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    FileSystemSchema schema = new FileSystemSchema(schemaName, session);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
   }
@@ -69,10 +70,10 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     private final WorkspaceSchema defaultSchema;
     private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
 
-    public FileSystemSchema(String name, UserSession session) throws IOException {
+    public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
       super(ImmutableList.<String>of(), name);
       for(WorkspaceSchemaFactory f :  factories){
-        WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
+        WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig);
         schemaMap.put(s.getName(), s);
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 955dfeb..5668c54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -44,11 +44,9 @@ public interface FormatPlugin {
 
   public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
 
-  public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
-
   public Set<StoragePluginOptimizerRule> getOptimizerRules();
 
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException;
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
 
   public FormatPluginConfig getConfig();
   public StoragePluginConfig getStorageConfig();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index a536350..7cd50b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -30,6 +31,7 @@ import net.hydromatic.optiq.Table;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
@@ -41,9 +43,10 @@ import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +55,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
 
 public class WorkspaceSchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
@@ -106,25 +111,27 @@ public class WorkspaceSchemaFactory {
     return DotDrillType.VIEW.getPath(config.getLocation(), name);
   }
 
-  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
-    return new WorkspaceSchema(parentSchemaPath, schemaName, session);
+  public WorkspaceSchema createSchema(List<String> parentSchemaPath, SchemaConfig schemaConfig) throws  IOException {
+    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig);
   }
 
   public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
     private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
-    private final UserSession session;
+    private final SchemaConfig schemaConfig;
     private final DrillFileSystem fs;
 
-    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig) throws IOException {
       super(parentSchemaPath, wsName);
-      this.session = session;
-      this.fs = new DrillFileSystem(fsConf);
+      this.schemaConfig = schemaConfig;
+      this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
     }
 
     public boolean createView(View view) throws Exception {
       Path viewPath = getViewPath(view.getName());
       boolean replaced = fs.exists(viewPath);
-      try (OutputStream stream = fs.create(viewPath)) {
+      final FsPermission viewPerms =
+          new FsPermission(schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(fs, viewPath, viewPerms)) {
         mapper.writeValue(stream, view);
       }
       return replaced;
@@ -145,11 +152,6 @@ public class WorkspaceSchemaFactory {
       return new SubDirectoryList(fileStatuses);
     }
 
-    public boolean viewExists(String viewName) throws Exception {
-      Path viewPath = getViewPath(viewName);
-      return fs.exists(viewPath);
-    }
-
     public void dropView(String viewName) throws IOException {
       fs.delete(getViewPath(viewName), false);
     }
@@ -165,6 +167,14 @@ public class WorkspaceSchemaFactory {
         }
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
+      } catch (AccessControlException e) {
+        if (!schemaConfig.getIgnoreAuthErrors()) {
+          logger.debug(e.getMessage());
+          throw UserException
+              .permissionError(e)
+              .message("Not authorized to list view tables in schema [%s]", getFullSchemaName())
+              .build();
+        }
       } catch (Exception e) {
         logger.warn("Failure while trying to list .view.drill files in workspace [{}]", getFullSchemaName(), e);
       }
@@ -177,7 +187,7 @@ public class WorkspaceSchemaFactory {
       return Sets.union(tables.keySet(), getViews());
     }
 
-    private View getView(DotDrillFile f) throws Exception{
+    private View getView(DotDrillFile f) throws IOException{
       assert f.getType() == DotDrillType.VIEW;
       return f.getView(drillConfig);
     }
@@ -190,19 +200,42 @@ public class WorkspaceSchemaFactory {
       }
 
       // then look for files that start with this name and end in .drill.
-      List<DotDrillFile> files;
+      List<DotDrillFile> files = Collections.EMPTY_LIST;
       try {
-        files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+        try {
+          files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+        } catch(AccessControlException e) {
+          if (!schemaConfig.getIgnoreAuthErrors()) {
+            logger.debug(e.getMessage());
+            throw UserException
+                .permissionError(e)
+                .message("Not authorized to list or query tables in schema [%s]", getFullSchemaName())
+                .build();
+          }
+        } catch(IOException e) {
+          logger.warn("Failure while trying to list view tables in workspace [{}]", name, getFullSchemaName(), e);
+        }
+
         for(DotDrillFile f : files) {
           switch(f.getType()) {
           case VIEW:
-            return new DrillViewTable(schemaPath, getView(f));
+            try {
+              return new DrillViewTable(getView(f), f.getOwner(), schemaConfig.getViewExpansionContext());
+            } catch (AccessControlException e) {
+              if (!schemaConfig.getIgnoreAuthErrors()) {
+                logger.debug(e.getMessage());
+                throw UserException
+                    .permissionError(e)
+                    .message("Not authorized to read view [%s] in schema [%s]", name, getFullSchemaName())
+                    .build();
+              }
+            } catch (IOException e) {
+              logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
+            }
           }
         }
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
-      } catch (Exception e) {
-        logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
       }
 
       return tables.get(name);
@@ -223,7 +256,7 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public CreateTableEntry createNewTable(String tableName) {
-      String storage = session.getOptions().getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
+      String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
         throw new UnsupportedOperationException(
@@ -256,7 +289,7 @@ public class WorkspaceSchemaFactory {
             try {
               Object selection = m.isReadable(fs, fileSelection);
               if (selection != null) {
-                return new DynamicDrillTable(plugin, storageEngineName, selection);
+                return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
               }
             } catch (IOException e) {
               logger.debug("File read failed.", e);
@@ -268,11 +301,19 @@ public class WorkspaceSchemaFactory {
         for (FormatMatcher m : fileMatchers) {
           Object selection = m.isReadable(fs, fileSelection);
           if (selection != null) {
-            return new DynamicDrillTable(plugin, storageEngineName, selection);
+            return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
           }
         }
         return null;
 
+      } catch (AccessControlException e) {
+        if (!schemaConfig.getIgnoreAuthErrors()) {
+          logger.debug(e.getMessage());
+          throw UserException
+              .permissionError(e)
+              .message("Not authorized to read table [%s] in schema [%s]", key, getFullSchemaName())
+              .build();
+        }
       } catch (IOException e) {
         logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 5c7152a..f1271b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
@@ -147,7 +148,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
         newColumns.add(AbstractRecordReader.STAR_COLUMN);
       }
       // Create a new sub scan object with the new set of columns;
-      scan = new EasySubScan(scan.getWorkUnits(), scan.getFormatPlugin(), newColumns, scan.getSelectionRoot());
+      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), newColumns,
+          scan.getSelectionRoot());
     }
 
     int numParts = 0;
@@ -203,13 +205,9 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new EasyGroupScan(selection, this, selection.selectionRoot);
-  }
-
-  @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 7c70df3..1b333ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
@@ -66,6 +67,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   @JsonCreator
   public EasyGroupScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("files") List<String> files, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -73,23 +75,26 @@ public class EasyGroupScan extends AbstractFileGroupScan{
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
-        this(new FileSelection(files, true),
+        this(ImpersonationUtil.resolveUserName(userName),
+            new FileSelection(files, true),
             (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
             columns,
             selectionRoot);
   }
 
-  public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+  public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
       throws IOException {
-    this(selection, formatPlugin, ALL_COLUMNS, selectionRoot);
+    this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
   }
 
   public EasyGroupScan(
+      String userName,
       FileSelection selection, //
       EasyFormatPlugin<?> formatPlugin, //
       List<SchemaPath> columns,
       String selectionRoot
       ) throws IOException{
+    super(userName);
     this.selection = Preconditions.checkNotNull(selection);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
@@ -98,7 +103,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private EasyGroupScan(EasyGroupScan that) {
-    Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+    super(that.getUserName());
     selection = that.selection;
     formatPlugin = that.formatPlugin;
     columns = that.columns;
@@ -110,7 +115,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
-    final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
+    final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
     this.selection = selection;
     BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
     this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
@@ -203,7 +208,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
   }
 
   private List<FileWorkImpl> convert(List<CompleteFileWork> list) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index e78ba0b..5fd5039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -47,6 +47,7 @@ public class EasySubScan extends AbstractSubScan{
 
   @JsonCreator
   public EasySubScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("files") List<FileWorkImpl> files, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -54,7 +55,7 @@ public class EasySubScan extends AbstractSubScan{
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
-
+    super(userName);
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
     this.files = files;
@@ -62,7 +63,9 @@ public class EasySubScan extends AbstractSubScan{
     this.selectionRoot = selectionRoot;
   }
 
-  public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
+  public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
+      String selectionRoot){
+    super(userName);
     this.formatPlugin = plugin;
     this.files = files;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 262c6be..e08fe71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -38,7 +38,7 @@ public class DirectGroupScan extends AbstractGroupScan{
   private final RecordReader reader;
 
   public DirectGroupScan(RecordReader reader) {
-    super();
+    super((String)null);
     this.reader = reader;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 89694f8..763ecba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -27,7 +27,7 @@ public class DirectSubScan extends AbstractSubScan{
   private final RecordReader reader;
 
   public DirectSubScan(RecordReader reader) {
-    super();
+    super(null);
     this.reader = reader;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 237589c..722650d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -77,8 +77,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project?
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 22cc483..bd0d582 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -54,11 +54,13 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
   @JsonCreator
   public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table,
                              @JsonProperty("filter") InfoSchemaFilter filter) {
+    super((String)null);
     this.table = table;
     this.filter = filter;
   }
 
   private InfoSchemaGroupScan(InfoSchemaGroupScan that) {
+    super(that);
     this.table = that.table;
     this.filter = that.filter;
     this.isFilterPushedDown = that.isFilterPushedDown;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index a1249e6..4dfde7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -29,7 +29,6 @@ import net.hydromatic.optiq.Table;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -37,6 +36,7 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
 public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements InfoSchemaConstants {
@@ -58,7 +58,8 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     SelectedTable table = selection.getWith(context.getConfig(),  SelectedTable.class);
     return new InfoSchemaGroupScan(table);
   }
@@ -69,7 +70,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     ISchema s = new ISchema(parent, this);
     parent.add(s.getName(), s);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index b9349b0..7a479d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -32,6 +32,7 @@ public class InfoSchemaSubScan extends AbstractSubScan{
   @JsonCreator
   public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table,
                            @JsonProperty("filter") InfoSchemaFilter filter) {
+    super(null);
     this.table = table;
     this.filter = filter;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
index 947998d..5b132c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
@@ -116,8 +116,14 @@ public abstract class RecordGenerator implements InfoSchemaConstants {
       // ... do for each of the schema's tables.
       for (String tableName: schema.getTableNames()) {
         Table table = schema.getTable(tableName);
-        // Visit the table, and if requested ...
 
+        if (table == null) {
+          // Schema may return NULL for table if the query user doesn't have permissions to load the table. Ignore such
+          // tables as INFO SCHEMA is about showing tables which the use has access to query.
+          continue;
+        }
+
+        // Visit the table, and if requested ...
         if (shouldVisitTable(schemaPath, tableName) && visitTable(schemaPath,  tableName, table)) {
           // ... do for each of the table's fields.
           RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl());

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index dc90a33..bb71c31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -53,8 +53,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
 
   @JsonCreator
   public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+    super((String)null);
     this.readEntries = readEntries;
-
     this.url = url;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 96226a1..1689300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -27,9 +27,9 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -45,7 +45,8 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
 
     ArrayList<MockScanEntry> readEntries = selection.getListWith(new ObjectMapper(),
         new TypeReference<ArrayList<MockScanEntry>>() {
@@ -55,7 +56,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9c83ea0..7298f53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -161,14 +161,9 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
-  public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return getGroupScan(selection, null);
-  }
-
-  @Override
-  public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    final DrillFileSystem dfs = new DrillFileSystem(fsConf);
-    return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
+  public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a59f2c9..21b9b48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -50,17 +52,17 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import parquet.hadoop.Footer;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
-import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -78,17 +80,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
 
-  private ListMultimap<Integer, RowGroupInfo> mappings;
-  private List<RowGroupInfo> rowGroupInfos;
   private final List<ReadEntryWithPath> entries;
   private final Stopwatch watch = new Stopwatch();
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final FileSystem fs;
-  private List<EndpointAffinity> endpointAffinities;
-  private String selectionRoot;
+  private final DrillFileSystem fs;
+  private final String selectionRoot;
 
+  private List<EndpointAffinity> endpointAffinities;
   private List<SchemaPath> columns;
+  private ListMultimap<Integer, RowGroupInfo> mappings;
+  private List<RowGroupInfo> rowGroupInfos;
 
   /*
    * total number of rows (obtained from parquet footer)
@@ -100,22 +102,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    */
   private Map<SchemaPath, Long> columnValueCounts;
 
-  public List<ReadEntryWithPath> getEntries() {
-    return entries;
-  }
-
-  @JsonProperty("format")
-  public ParquetFormatConfig getFormatConfig() {
-    return this.formatConfig;
-  }
-
-  @JsonProperty("storage")
-  public StoragePluginConfig getEngineConfig() {
-    return this.formatPlugin.getStorageConfig();
-  }
-
   @JsonCreator
   public ParquetGroupScan( //
+      @JsonProperty("userName") String userName,
       @JsonProperty("entries") List<ReadEntryWithPath> entries, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -123,6 +112,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
       ) throws IOException, ExecutionSetupException {
+    super(ImpersonationUtil.resolveUserName(userName));
     this.columns = columns;
     if (formatConfig == null) {
       formatConfig = new ParquetFormatConfig();
@@ -131,29 +121,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin);
-    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+    this.fs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
     this.readFooterFromEntries();
-
-  }
-
-  public String getSelectionRoot() {
-    return selectionRoot;
   }
 
-  public ParquetGroupScan(List<FileStatus> files, //
+  public ParquetGroupScan( //
+      String userName,
+      FileSelection selection, //
       ParquetFormatPlugin formatPlugin, //
       String selectionRoot,
       List<SchemaPath> columns) //
           throws IOException {
+    super(userName);
     this.formatPlugin = formatPlugin;
     this.columns = columns;
     this.formatConfig = formatPlugin.getConfig();
-    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+    this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf());
 
     this.entries = Lists.newArrayList();
+    List<FileStatus> files = selection.getFileStatusList(fs);
     for (FileStatus file : files) {
       entries.add(new ReadEntryWithPath(file.getPath().toString()));
     }
@@ -167,6 +156,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * This is used to clone another copy of the group scan.
    */
   private ParquetGroupScan(ParquetGroupScan that) {
+    super(that);
     this.columns = that.columns;
     this.endpointAffinities = that.endpointAffinities;
     this.entries = that.entries;
@@ -180,6 +170,25 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.columnValueCounts = that.columnValueCounts;
   }
 
+
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+
+  @JsonProperty("format")
+  public ParquetFormatConfig getFormatConfig() {
+    return this.formatConfig;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return this.formatPlugin.getStorageConfig();
+  }
+
+  public String getSelectionRoot() {
+    return selectionRoot;
+  }
+
   private void readFooterFromEntries()  throws IOException {
     List<FileStatus> files = Lists.newArrayList();
     for (ReadEntryWithPath e : entries) {
@@ -188,12 +197,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     readFooter(files);
   }
 
-  private void readFooter(List<FileStatus> statuses) throws IOException {
+  private void readFooter(final List<FileStatus> statuses) {
+    final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          readFooterHelper(statuses);
+          return null;
+        }
+      });
+    } catch (InterruptedException | IOException e) {
+      final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  private void readFooterHelper(List<FileStatus> statuses) throws IOException {
     watch.reset();
     watch.start();
     Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
 
-
     rowGroupInfos = Lists.newArrayList();
     long start = 0, length = 0;
     rowCount = 0;
@@ -373,7 +397,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
+    return new ParquetRowGroupScan(
+        getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
   }
 
   private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index fd40f41..987f792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -55,22 +55,26 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @JsonCreator
   public ParquetRowGroupScan( //
       @JacksonInject StoragePluginRegistry registry, //
+      @JsonProperty("userName") String userName, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
   ) throws ExecutionSetupException {
-    this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+    this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
             formatConfig == null ? new ParquetFormatConfig() : formatConfig),
         rowGroupReadEntries, columns, selectionRoot);
   }
 
   public ParquetRowGroupScan( //
+      String userName, //
       ParquetFormatPlugin formatPlugin, //
       List<RowGroupReadEntry> rowGroupReadEntries, //
-      List<SchemaPath> columns,
-      String selectionRoot) {
+      List<SchemaPath> columns, //
+      String selectionRoot //
+  ) {
+    super(userName);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
     this.formatConfig = formatPlugin.getConfig();
     this.rowGroupReadEntries = rowGroupReadEntries;
@@ -110,7 +114,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns, selectionRoot);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index b1c725c..52dccd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -90,7 +91,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       }
       final int id = rowGroupScan.getOperatorId();
       // Create the new row group scan with the new columns
-      rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getStorageEngine(), rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
+      rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
+          rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
       rowGroupScan.setOperatorId(id);
     }
 
@@ -100,7 +102,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
     } catch(IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
     }
-    Configuration conf = fs.getConf();
+    Configuration conf = new Configuration(fs.getConf());
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
     conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
@@ -119,7 +121,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       try {
         if ( ! footers.containsKey(e.getPath())){
           footers.put(e.getPath(),
-              ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
+              ParquetFileReader.readFooter(conf, new Path(e.getPath())));
         }
         if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
           readers.add(

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 4a3b97b..a13c945 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -31,11 +31,11 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.apache.drill.exec.store.SchemaConfig;
 
 /**
  * A "storage" plugin for system tables.
@@ -67,12 +67,13 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     parent.add(schema.getName(), schema);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
     return new SystemTableScan(table, this);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index f8baf97..22bd7df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -54,11 +54,13 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
       @JsonProperty("table") SystemTable table, //
       @JacksonInject StoragePluginRegistry engineRegistry //
   ) throws IOException, ExecutionSetupException {
+    super((String)null);
     this.table = table;
     this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
   }
 
   public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
+    super((String)null);
     this.table = table;
     this.plugin = plugin;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
new file mode 100644
index 0000000..9997178
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.google.common.base.Strings;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Utilities for impersonation purpose.
+ */
+public class ImpersonationUtil {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImpersonationUtil.class);
+
+  /**
+   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} of operator owner if operator
+   * owner is valid. Otherwise create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for
+   * query user.
+   *
+   * @param opUserName Name of the user whom to impersonate while setting up the operator.
+   * @param queryUserName Name of the user who issues the query. If <i>opUserName</i> is invalid,
+   *                      then this parameter must be valid user name.
+   * @return
+   */
+  public static UserGroupInformation createProxyUgi(String opUserName, String queryUserName) {
+    if (!Strings.isNullOrEmpty(opUserName)) {
+      return createProxyUgi(opUserName);
+    }
+
+    if (Strings.isNullOrEmpty(queryUserName)) {
+      // TODO(DRILL-2097): Tests that use SimpleRootExec have don't assign any query user name in FragmentContext.
+      // Disable throwing exception to modifying the long list of test files.
+      // throw new DrillRuntimeException("Invalid value for query user name");
+      return getProcessUserUGI();
+    }
+
+    return createProxyUgi(queryUserName);
+  }
+
+  /**
+   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for give user name.
+   *
+   * TODO: we may want to cache the {@link org.apache.hadoop.security.UserGroupInformation} instances as we try to
+   * create different instances for the same user which is an unnecessary overhead.
+   *
+   * @param proxyUserName Proxy user name (must be valid)
+   * @return
+   */
+  public static UserGroupInformation createProxyUgi(String proxyUserName) {
+    try {
+      if (Strings.isNullOrEmpty(proxyUserName)) {
+        throw new DrillRuntimeException("Invalid value for proxy user name");
+      }
+
+      return UserGroupInformation.createProxyUser(proxyUserName, UserGroupInformation.getLoginUser());
+    } catch(IOException e) {
+      final String errMsg = "Failed to create proxy user UserGroupInformation object: " + e.getMessage();
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  /**
+   * If the given user name is empty, return the current process user name. This is a temporary change to avoid
+   * modifying long list of tests files which have GroupScan operator with no user name property.
+   * @param userName User name found in GroupScan POP definition.
+   */
+  public static String resolveUserName(String userName) {
+    if (!Strings.isNullOrEmpty(userName)) {
+      return userName;
+    }
+    return getProcessUserName();
+  }
+
+  /**
+   * Return the name of the user who is running the Drillbit.
+   *
+   * @return Drillbit process user.
+   */
+  public static String getProcessUserName() {
+    return getProcessUserUGI().getUserName();
+  }
+
+  /**
+   * Return the {@link org.apache.hadoop.security.UserGroupInformation} of user who is running the Drillbit.
+   *
+   * @return Drillbit process user {@link org.apache.hadoop.security.UserGroupInformation}.
+   */
+  public static UserGroupInformation getProcessUserUGI() {
+    try {
+      return UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      final String errMsg = "Failed to get process user UserGroupInformation object.";
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  /**
+   * Create DrillFileSystem for given <i>proxyUserName</i> and configuration.
+   *
+   * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+   * @param fsConf FileSystem configuration.
+   * @return
+   */
+  public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf) {
+    return createFileSystem(proxyUserName, fsConf, null);
+  }
+
+  /**
+   * Create DrillFileSystem for given <i>proxyUserName</i>, configuration and stats.
+   *
+   * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+   * @param fsConf FileSystem configuration.
+   * @param stats OperatorStats for DrillFileSystem (optional)
+   * @return
+   */
+  public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf, OperatorStats stats) {
+    return createFileSystem(createProxyUgi(proxyUserName), fsConf, stats);
+  }
+
+  /** Helper method to create DrillFileSystem */
+  private static DrillFileSystem createFileSystem(UserGroupInformation proxyUserUgi, final Configuration fsConf,
+      final OperatorStats stats) {
+    DrillFileSystem fs;
+    try {
+      fs = proxyUserUgi.doAs(new PrivilegedExceptionAction<DrillFileSystem>() {
+        public DrillFileSystem run() throws Exception {
+          logger.debug("Creating DrillFileSystem for proxy user: " + UserGroupInformation.getCurrentUser());
+          return new DrillFileSystem(fsConf, stats);
+        }
+      });
+    } catch (InterruptedException | IOException e) {
+      final String errMsg = "Failed to create DrillFileSystem for proxy user: " + e.getMessage();
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index cb2753c..edbcfde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -779,7 +779,7 @@ public class Foreman implements Runnable {
     if (logger.isDebugEnabled()) {
       logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
     }
-    return new BasicOptimizer(queryContext).optimize(
+    return new BasicOptimizer(queryContext, initiatingClient).optimize(
         new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index be798ec..0701252 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -38,7 +39,9 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
@@ -182,13 +185,23 @@ public class FragmentExecutor implements Runnable {
         }
       }
 
-      injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
-      /*
-       * Run the query until root.next returns false OR we no longer need to continue.
-       */
-      while (shouldContinue() && root.next()) {
-        // loop
-      }
+      final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
+          ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
+          ImpersonationUtil.getProcessUserUGI();
+
+      queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
+          /*
+           * Run the query until root.next returns false OR we no longer need to continue.
+           */
+          while (shouldContinue() && root.next()) {
+            // loop
+          }
+
+          return null;
+        }
+      });
 
       updateState(FragmentState.FINISHED);
     } catch (AssertionError | Exception e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6bd8db0..8006533 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -101,6 +101,10 @@ drill.exec: {
       write: true
     }
   },
+  impersonation: {
+    enabled: false,
+    max_chained_user_hops: 3
+  },
   security.user.auth {
     enabled: false,
     packages += "org.apache.drill.exec.rpc.user.security",

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index c3223b8..e7f6896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.testing.ExecutionControls;
@@ -102,7 +103,7 @@ public class PlanningBase extends ExecTest{
     final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
     final DrillOperatorTable table = new DrillOperatorTable(functionRegistry);
     final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
-    registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root);
+    registry.getSchemaFactory().registerSchemas(SchemaConfig.newBuilder("foo", context).build(), root);
 
     new NonStrictExpectations() {
       {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index ba905c4..dc37071 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
         bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalPlan pp = new BasicOptimizer(qc).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+    PhysicalPlan pp = new BasicOptimizer(qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
 
 
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 2cba992..604f375 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -38,8 +39,9 @@ public class TestExceptionInjection extends BaseTestQuery {
   private static final String NO_THROW_FAIL = "Didn't throw expected exception";
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-    .withOptionManager(bits[0].getContext().getOptionManager())
-    .build();
+      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withOptionManager(bits[0].getContext().getOptionManager())
+      .build();
 
   /**
    * Class whose methods we want to simulate runtime at run-time for testing
@@ -248,8 +250,9 @@ public class TestExceptionInjection extends BaseTestQuery {
     final DrillbitContext drillbitContext2 = drillbit2.getContext();
 
     final UserSession session = UserSession.Builder.newBuilder()
-      .withOptionManager(drillbitContext1.getOptionManager())
-      .build();
+        .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+        .withOptionManager(drillbitContext1.getOptionManager())
+        .build();
 
     final String passthroughDesc = "<<injected from descPassthrough>>";
     final int nSkip = 7;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 1c219f0..508b10c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.testing;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -29,8 +30,9 @@ import static org.junit.Assert.fail;
 public class TestPauseInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-    .withOptionManager(bits[0].getContext().getOptionManager())
-    .build();
+      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withOptionManager(bits[0].getContext().getOptionManager())
+      .build();
 
   /**
    * Class whose methods we want to simulate pauses at run-time for testing

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b246d9..91707fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,7 +945,7 @@
           <dependency>
             <groupId>net.hydromatic</groupId>
             <artifactId>optiq-core</artifactId>
-            <version>0.9-drill-r21</version>
+            <version>0.9-drill-r21.1</version>
             <exclusions>
               <exclusion>
                 <groupId>org.jgrapht</groupId>


[2/4] drill git commit: DRILL-2514: Add support for impersonation in FileSystem storage plugin.

Posted by ve...@apache.org.
DRILL-2514: Add support for impersonation in FileSystem storage plugin.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40c90403
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40c90403
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40c90403

Branch: refs/heads/master
Commit: 40c90403255d55b412efe2c3a78289bd75325119
Parents: 117b749
Author: vkorukanti <ve...@gmail.com>
Authored: Wed Mar 18 15:51:44 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  17 +-
 .../store/hbase/HBasePushFilterIntoScan.java    |   3 +-
 .../exec/store/hbase/HBaseSchemaFactory.java    |   4 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |  10 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |   7 +-
 .../HivePushPartitionFilterIntoScan.java        |   2 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |  11 +-
 .../exec/store/hive/HiveStoragePlugin.java      |  10 +-
 .../drill/exec/store/hive/HiveSubScan.java      |   6 +-
 .../store/hive/schema/HiveSchemaFactory.java    |   4 +-
 .../drill/exec/store/mongo/MongoGroupScan.java  |  13 +-
 .../store/mongo/MongoPushDownFilterForScan.java |   2 +-
 .../exec/store/mongo/MongoStoragePlugin.java    |  15 +-
 .../drill/exec/store/mongo/MongoSubScan.java    |   7 +-
 .../store/mongo/schema/MongoSchemaFactory.java  |   6 +-
 .../src/resources/drill-override-example.conf   |   4 +
 .../org/apache/drill/exec/ExecConstants.java    |   6 +
 .../drill/exec/dotdrill/DotDrillFile.java       |  10 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  30 +++-
 .../org/apache/drill/exec/ops/QueryContext.java |  61 ++++++-
 .../drill/exec/ops/ViewExpansionContext.java    | 175 +++++++++++++++++++
 .../apache/drill/exec/opt/BasicOptimizer.java   |   8 +-
 .../drill/exec/physical/base/AbstractBase.java  |  20 +++
 .../physical/base/AbstractFileGroupScan.java    |   7 +
 .../exec/physical/base/AbstractGroupScan.java   |   8 +
 .../exec/physical/base/AbstractSubScan.java     |   4 +
 .../exec/physical/base/PhysicalOperator.java    |   8 +
 .../drill/exec/physical/impl/ImplCreator.java   | 128 +++++++++-----
 .../drill/exec/planner/logical/DrillTable.java  |  39 ++++-
 .../exec/planner/logical/DrillViewTable.java    |  37 ++--
 .../exec/planner/logical/DynamicDrillTable.java |   9 +
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   3 +
 .../sql/handlers/CreateTableHandler.java        |   4 +-
 .../exec/planner/torel/ConversionContext.java   |   6 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../drill/exec/store/AbstractStoragePlugin.java |   6 +-
 .../apache/drill/exec/store/SchemaConfig.java   |  93 ++++++++++
 .../apache/drill/exec/store/SchemaFactory.java  |  14 +-
 .../apache/drill/exec/store/StoragePlugin.java  |  18 +-
 .../drill/exec/store/StoragePluginRegistry.java |   6 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |  12 +-
 .../exec/store/dfs/FileSystemSchemaFactory.java |  11 +-
 .../drill/exec/store/dfs/FormatPlugin.java      |   4 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  85 ++++++---
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  14 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      |  17 +-
 .../drill/exec/store/dfs/easy/EasySubScan.java  |   7 +-
 .../exec/store/direct/DirectGroupScan.java      |   2 +-
 .../drill/exec/store/direct/DirectSubScan.java  |   2 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |   5 +-
 .../exec/store/ischema/InfoSchemaGroupScan.java |   2 +
 .../store/ischema/InfoSchemaStoragePlugin.java  |   7 +-
 .../exec/store/ischema/InfoSchemaSubScan.java   |   1 +
 .../exec/store/ischema/RecordGenerator.java     |   8 +-
 .../drill/exec/store/mock/MockGroupScanPOP.java |   2 +-
 .../exec/store/mock/MockStorageEngine.java      |   7 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  11 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  89 ++++++----
 .../exec/store/parquet/ParquetRowGroupScan.java |  12 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   7 +-
 .../drill/exec/store/sys/SystemTableScan.java   |   2 +
 .../drill/exec/util/ImpersonationUtil.java      | 162 +++++++++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java |   2 +-
 .../exec/work/fragment/FragmentExecutor.java    |  27 ++-
 .../src/main/resources/drill-module.conf        |   4 +
 .../java/org/apache/drill/PlanningBase.java     |   3 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 .../exec/testing/TestExceptionInjection.java    |  11 +-
 .../drill/exec/testing/TestPauseInjection.java  |   6 +-
 pom.xml                                         |   2 +-
 71 files changed, 1097 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 6d18d12..e52e2e4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -101,14 +101,17 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   private long scanSizeInBytes = 0;
 
   @JsonCreator
-  public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
+  public HBaseGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
+    this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
   }
 
-  public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+  public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
+      List<SchemaPath> columns) {
+    super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
@@ -121,6 +124,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
    * @param that The HBaseGroupScan to clone
    */
   private HBaseGroupScan(HBaseGroupScan that) {
+    super(that);
     this.columns = that.columns;
     this.hbaseScanSpec = that.hbaseScanSpec;
     this.endpointFragmentMapping = that.endpointFragmentMapping;
@@ -342,7 +346,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     assert minorFragmentId < endpointFragmentMapping.size() : String.format(
         "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
         minorFragmentId);
-    return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
+    return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
+        endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
   @Override
@@ -427,7 +432,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
    * Empty constructor, do not use, only for testing.
    */
   @VisibleForTesting
-  public HBaseGroupScan() { }
+  public HBaseGroupScan() {
+    super((String)null);
+  }
 
   /**
    * Do not use, only for testing.

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 2b419d4..c395b43 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -62,7 +62,8 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
       return; //no filter pushdown ==> No transformation.
     }
 
-    final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+    final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+        newScanSpec, groupScan.getColumns());
     newGroupsScan.setFilterPushedDown(true);
 
     final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 1c407e1..47d08b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -25,8 +25,8 @@ import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     HBaseSchema schema = new HBaseSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 948d462..2214c50 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.JSONOptions;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -60,14 +60,14 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+  public HBaseGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     HBaseScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<HBaseScanSpec>() {});
-    return new HBaseGroupScan(this, scanSpec, null);
+    return new HBaseGroupScan(userName, this, scanSpec, null);
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 23d8c5a..96ae257 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -55,17 +55,20 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
 
   @JsonCreator
   public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
                       @JsonProperty("storage") StoragePluginConfig storage,
                       @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
     hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
     this.regionScanSpecList = regionScanSpecList;
     this.storage = (HBaseStoragePluginConfig) storage;
     this.columns = columns;
   }
 
-  public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
+  public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
       List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
+    super(userName);
     hbaseStoragePlugin = plugin;
     storage = config;
     this.regionScanSpecList = regionInfoList;
@@ -103,7 +106,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
+    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 374c486..9b93ac0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -132,7 +132,7 @@ public abstract class HivePushPartitionFilterIntoScan extends StoragePluginOptim
 
     try {
       HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
-      HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, oldScan.columns);
+      HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, oldScan.storagePlugin, oldScan.columns);
       PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder);
     } catch (ExecutionSetupException e) {
       throw new DrillRuntimeException(e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 92635a8..8a2e498 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,12 @@ public class HiveScan extends AbstractGroupScan {
   private long rowCount = 0;
 
   @JsonCreator
-  public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+  public HiveScan(@JsonProperty("userName") final String userName,
+                  @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
                   @JsonProperty("storage-plugin") final String storagePluginName,
                   @JsonProperty("columns") final List<SchemaPath> columns,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.storagePluginName = storagePluginName;
     this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +108,8 @@ public class HiveScan extends AbstractGroupScan {
     endpoints = storagePlugin.getContext().getBits();
   }
 
-  public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
@@ -116,6 +119,7 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   private HiveScan(final HiveScan that) {
+    super(that);
     this.columns = that.columns;
     this.endpoints = that.endpoints;
     this.hiveReadEntry = that.hiveReadEntry;
@@ -226,8 +230,9 @@ public class HiveScan extends AbstractGroupScan {
       if (parts.contains(null)) {
         parts = null;
       }
+
       final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
-      return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 91e7a92..a19ebb8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -29,9 +29,9 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
 
@@ -67,7 +67,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
     try {
       if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -75,15 +75,15 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
             "Querying views created in Hive from Drill is not supported in current version.");
       }
 
-      return new HiveScan(hiveReadEntry, this, columns);
+      return new HiveScan(userName, hiveReadEntry, this, columns);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {
     return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 1233202..2181c2a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -62,10 +62,12 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   private List<Partition> partitions;
 
   @JsonCreator
-  public HiveSubScan(@JsonProperty("splits") List<String> splits,
+  public HiveSubScan(@JsonProperty("userName") String userName,
+                     @JsonProperty("splits") List<String> splits,
                      @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.table = hiveReadEntry.getTable();
     this.partitions = hiveReadEntry.getPartitions();
@@ -126,7 +128,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     try {
-      return new HiveSubScan(splits, hiveReadEntry, splitClasses, columns);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 587e90d..ec30f01 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -30,8 +30,8 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
@@ -187,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     HiveSchema schema = new HiveSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b086786..54d34f9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -115,17 +115,20 @@ public class MongoGroupScan extends AbstractGroupScan implements
   private boolean filterPushedDown = false;
 
   @JsonCreator
-  public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
+  public MongoGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
       ExecutionSetupException {
-    this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+    this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
         scanSpec, columns);
   }
 
-  public MongoGroupScan(MongoStoragePlugin storagePlugin,
+  public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
       MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+    super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
@@ -140,6 +143,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
    *          The MongoGroupScan to clone
    */
   private MongoGroupScan(MongoGroupScan that) {
+    super(that);
     this.scanSpec = that.scanSpec;
     this.columns = that.columns;
     this.storagePlugin = that.storagePlugin;
@@ -446,7 +450,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
   @Override
   public MongoSubScan getSpecificScan(int minorFragmentId)
       throws ExecutionSetupException {
-    return new MongoSubScan(storagePlugin, storagePluginConfig,
+    return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
         endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
@@ -554,6 +558,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @VisibleForTesting
   MongoGroupScan() {
+    super((String)null);
   }
 
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index 9af49b1..1d3b292 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -68,7 +68,7 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
 
     MongoGroupScan newGroupsScan = null;
     try {
-      newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
+      newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
           newScanSpec, groupScan.getColumns());
     } catch (IOException e) {
       logger.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index dfad5ef..d291325 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -25,9 +25,9 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
 import org.slf4j.Logger;
@@ -63,8 +63,8 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   @Override
@@ -73,12 +73,9 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection)
-      throws IOException {
-    MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
-        new TypeReference<MongoScanSpec>() {
-        });
-    return new MongoGroupScan(this, mongoScanSpec, null);
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {});
+    return new MongoGroupScan(userName, this, mongoScanSpec, null);
   }
 
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 36008cf..fb6e095 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -57,10 +57,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   @JsonCreator
   public MongoSubScan(
       @JacksonInject StoragePluginRegistry registry,
+      @JsonProperty("userName") String userName,
       @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
       @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
       @JsonProperty("columns") List<SchemaPath> columns)
       throws ExecutionSetupException {
+    super(userName);
     this.columns = columns;
     this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
     this.mongoStoragePlugin = (MongoStoragePlugin) registry
@@ -68,9 +70,10 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     this.chunkScanSpecList = chunkScanSpecList;
   }
 
-  public MongoSubScan(MongoStoragePlugin storagePlugin,
+  public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
       MongoStoragePluginConfig storagePluginConfig,
       List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+    super(userName);
     this.mongoStoragePlugin = storagePlugin;
     this.mongoPluginConfig = storagePluginConfig;
     this.columns = columns;
@@ -105,7 +108,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig,
+    return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
         chunkScanSpecList, columns);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index f650ccc..c941176 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -34,8 +34,8 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoCnxnManager;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     MongoSchema schema = new MongoSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);
@@ -188,7 +188,7 @@ public class MongoSchemaFactory implements SchemaFactory {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(dbName, collectionName);
-      return new DynamicDrillTable(plugin, schemaName, mongoScanSpec);
+      return new DynamicDrillTable(plugin, schemaName, null, mongoScanSpec);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 943d644..805d6e9 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -106,6 +106,10 @@ drill.exec: {
       write: true
     }
   },
+  impersonation: {
+    enabled: false,
+    max_chained_user_hops: 3
+  },
   security.user.auth {
     enabled: false,
     packages += "org.apache.drill.exec.rpc.user.security",

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f7648b1..bafbbc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -75,6 +75,8 @@ public interface ExecConstants {
   public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
   public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
   public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
+  public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
+  public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
   public static final String USER_AUTHENTICATOR_IMPL_PACKAGES = "drill.exec.security.user.auth.packages";
   public static final String USER_AUTHENTICATION_ENABLED = "drill.exec.security.user.auth.enabled";
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
@@ -220,4 +222,8 @@ public interface ExecConstants {
   public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
   public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
     new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
+
+  public static final String NEW_VIEW_DEFAULT_PERMS_KEY = "new_view_default_permissions";
+  public static final OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
+      new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
index f9a8ff5..efa8cc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
 
 import com.google.common.base.Preconditions;
 
+import java.io.IOException;
 import java.io.InputStream;
 
 public class DotDrillFile {
@@ -51,6 +52,13 @@ public class DotDrillFile {
   }
 
   /**
+   * @return Return owner of the file in underlying file system.
+   */
+  public String getOwner() {
+    return status.getOwner();
+  }
+
+  /**
    * Return base file name without the parent directory and extensions.
    * @return Base file name.
    */
@@ -59,7 +67,7 @@ public class DotDrillFile {
     return fileName.substring(0, fileName.lastIndexOf(type.getEnding()));
   }
 
-  public View getView(DrillConfig config) throws Exception{
+  public View getView(DrillConfig config) throws IOException {
     Preconditions.checkArgument(type == DotDrillType.VIEW);
     try(InputStream is = fs.open(status.getPath())){
       return config.getMapper().readValue(is, View.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 7dfd0e6..c566a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -48,7 +49,9 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -201,7 +204,18 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
       return null;
     }
 
-    return queryContext.getRootSchema();
+    final boolean isImpersonationEnabled = isImpersonationEnabled();
+    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
+    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
+    // disabled view the schema as Drillbit process user and throw authorization errors to client.
+    SchemaConfig schemaConfig = SchemaConfig
+        .newBuilder(
+            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
+            queryContext)
+        .setIgnoreAuthErrors(isImpersonationEnabled)
+        .build();
+
+    return queryContext.getRootSchema(schemaConfig);
   }
 
   /**
@@ -327,6 +341,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return executionControls;
   }
 
+  public String getQueryUserName() {
+    return fragment.getCredentials().getUserName();
+  }
+
+  public boolean isImpersonationEnabled() {
+    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+    // no config
+    if (getConfig() == null) {
+      return false;
+    }
+
+    return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
   @Override
   public void close() {
     waitForSendComplete();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index cd5c054..cc02658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -39,8 +40,10 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.store.PartitionExplorerImpl;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 // TODO except for a couple of tests, this is only created by Foreman
 // TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -49,6 +52,9 @@ import org.apache.drill.exec.testing.ExecutionControls;
 public class QueryContext implements AutoCloseable, UdfUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
+  private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
+  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+
   private final DrillbitContext drillbitContext;
   private final UserSession session;
   private final OptionManager queryOptions;
@@ -59,8 +65,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   private final BufferAllocator allocator;
   private final BufferManager bufferManager;
   private final QueryDateTimeInfo queryDateTimeInfo;
-  private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
-  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+  private final ViewExpansionContext viewExpansionContext;
 
   /*
    * Flag to indicate if close has been called, after calling close the first
@@ -89,6 +94,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     }
     // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
     bufferManager = new BufferManager(this.allocator, null);
+    viewExpansionContext = new ViewExpansionContext(this);
   }
 
   public PlannerSettings getPlannerSettings() {
@@ -103,6 +109,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return allocator;
   }
 
+  /**
+   * Return reference to default schema instance in a schema tree. Each {@link net.hydromatic.optiq.SchemaPlus}
+   * instance can refer to its parent and its children. From the returned reference to default schema instance,
+   * clients can traverse the entire schema tree and know the default schema where to look up the tables first.
+   *
+   * @return Reference to default schema instance in a schema tree.
+   */
   public SchemaPlus getNewDefaultSchema() {
     final SchemaPlus rootSchema = getRootSchema();
     final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
@@ -113,18 +126,52 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return defaultSchema;
   }
 
+  /**
+   * Get root schema with schema owner as the user who issued the query that is managed by this QueryContext.
+   * @return Root of the schema tree.
+   */
   public SchemaPlus getRootSchema() {
+    return getRootSchema(getQueryUserName());
+  }
+
+  /**
+   * Return root schema with schema owner as the given user.
+   *
+   * @param userName User who owns the schema tree.
+   * @return Root of the schema tree.
+   */
+  public SchemaPlus getRootSchema(String userName) {
+    final String schemaUser = isImpersonationEnabled() ? userName : ImpersonationUtil.getProcessUserName();
+    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, this).build();
+    return getRootSchema(schemaConfig);
+  }
+
+  /**
+   *  Create and return a SchemaTree with given <i>schemaConfig</i>.
+   * @param schemaConfig
+   * @return
+   */
+  public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
     try {
       final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
-      drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+      drillbitContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
       return rootSchema;
     } catch(IOException e) {
+      // We can't proceed further without a schema, throw a runtime exception.
       final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
       logger.error(errMsg, e);
       throw new DrillRuntimeException(errMsg, e);
     }
   }
 
+  /**
+   * Get the user name of the user who issued the query that is managed by this QueryContext.
+   * @return
+   */
+  public String getQueryUserName() {
+    return session.getCredentials().getUserName();
+  }
+
   public OptionManager getOptions() {
     return queryOptions;
   }
@@ -153,6 +200,14 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return drillbitContext.getFunctionImplementationRegistry();
   }
 
+  public ViewExpansionContext getViewExpansionContext() {
+    return viewExpansionContext;
+  }
+
+  public boolean isImpersonationEnabled() {
+     return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
   public DrillOperatorTable getDrillOperatorTable() {
     return table;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
new file mode 100644
index 0000000..9d04ab9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.google.common.base.Preconditions;
+import net.hydromatic.optiq.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelOptTable.ToRelContext;
+
+import static org.apache.drill.exec.ExecConstants.IMPERSONATION_MAX_CHAINED_USER_HOPS;
+
+/**
+ * Contains context information about view expansion(s) in a query. Part of {@link org.apache.drill.exec.ops
+ * .QueryContext}. Before expanding a view into its definition, as part of the
+ * {@link org.apache.drill.exec.planner.logical.DrillViewTable#toRel(ToRelContext, RelOptTable)}, first a
+ * {@link ViewExpansionToken} is requested from ViewExpansionContext through {@link #reserveViewExpansionToken(String)}.
+ * Once view expansion is complete, a token is released through {@link ViewExpansionToken#release()}. A view definition
+ * itself may contain zero or more views for expanding those nested views also a token is obtained.
+ *
+ * Ex:
+ *   Following are the available view tables: { "view_1", "view_2", "view_3", "view_4" }. Corresponding owners are
+ *   {"view1Owner", "view2Owner", "view3Owner", "view4Owner"}.
+ *   Definition of "view4" : "SELECT field4 FROM view3"
+ *   Definition of "view3" : "SELECT field4, field3 FROM view2"
+ *   Definition of "view2" : "SELECT field4, field3, field2 FROM view1"
+ *   Definition of "view1" : "SELECT field4, field3, field2, field1 FROM someTable"
+ *
+ *   Query is: "SELECT * FROM view4".
+ *   Steps:
+ *     1. "view4" comes for expanding it into its definition
+ *     2. A token "view4Token" is requested through {@link #reserveViewExpansionToken(String view4Owner)}
+ *     3. "view4" is called for expansion. As part of it
+ *       3.1 "view3" comes for expansion
+ *       3.2 A token "view3Token" is requested through {@link #reserveViewExpansionToken(String view3Owner)}
+ *       3.3 "view3" is called for expansion. As part of it
+ *           3.3.1 "view2" comes for expansion
+ *           3.3.2 A token "view2Token" is requested through {@link #reserveViewExpansionToken(String view2Owner)}
+ *           3.3.3 "view2" is called for expansion. As part of it
+ *                 3.3.3.1 "view1" comes for expansion
+ *                 3.3.3.2 A token "view1Token" is requested through {@link #reserveViewExpansionToken(String view1Owner)}
+ *                 3.3.3.3 "view1" is called for expansion
+ *                 3.3.3.4 "view1" expansion is complete
+ *                 3.3.3.5 Token "view1Token" is released
+ *           3.3.4 "view2" expansion is complete
+ *           3.3.5 Token "view2Token" is released
+ *       3.4 "view3" expansion is complete
+ *       3.5 Token "view3Token" is released
+ *    4. "view4" expansion is complete
+ *    5. Token "view4Token" is released.
+ *
+ */
+public class ViewExpansionContext {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewExpansionContext.class);
+
+  private final QueryContext queryContext;
+  private final int maxChainedUserHops;
+  private final String queryUser;
+  private final ObjectIntOpenHashMap<String> userTokens = new ObjectIntOpenHashMap<>();
+
+  public ViewExpansionContext(QueryContext queryContext) {
+    this.queryContext = queryContext;
+    this.maxChainedUserHops =
+        queryContext.getConfig().getInt(IMPERSONATION_MAX_CHAINED_USER_HOPS);
+    this.queryUser = queryContext.getQueryUserName();
+  }
+
+  public boolean isImpersonationEnabled() {
+    return queryContext.isImpersonationEnabled();
+  }
+
+  /**
+   * Reserve a token for expansion of view owned by given user name. If it can't issue any more tokens,
+   * throws {@link UserException}.
+   *
+   * @param viewOwner Name of the user who owns the view.
+   * @return An instance of {@link org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken} which must be
+   *         released when done using the token.
+   */
+  public ViewExpansionToken reserveViewExpansionToken(String viewOwner) {
+    int totalTokens = 1;
+    if (!viewOwner.equals(queryUser)) {
+      // We want to track the tokens only if the "viewOwner" is not same as the "queryUser".
+      if (userTokens.containsKey(viewOwner)) {
+        // If the user already exists, we don't need to validate the limit on maximum user hops in chained impersonation
+        // as the limit is for number of unique users.
+        totalTokens += userTokens.get(viewOwner);
+      } else {
+        // Make sure we are not exceeding the limit of maximum number impersonation user hops in chained impersonation.
+        if (userTokens.size() == maxChainedUserHops) {
+          final String errMsg =
+              String.format("Cannot issue token for view expansion as issuing the token exceeds the " +
+                  "maximum allowed number of user hops (%d) in chained impersonation.", maxChainedUserHops);
+          logger.error(errMsg);
+          throw UserException.permissionError().message(errMsg).build();
+        }
+      }
+
+      userTokens.put(viewOwner, totalTokens);
+
+      logger.debug("Issued view expansion token for user '{}'", viewOwner);
+    }
+
+    return new ViewExpansionToken(viewOwner);
+  }
+
+  private void releaseViewExpansionToken(ViewExpansionToken token) {
+    final String viewOwner = token.viewOwner;
+
+    if (viewOwner.equals(queryUser)) {
+      // If the token owner and queryUser are same, no need to track the token release.
+      return;
+    }
+
+    Preconditions.checkState(userTokens.containsKey(token.viewOwner),
+        "Given user doesn't exist in User Token store. Make sure token for this user is obtained first.");
+
+    final int userTokenCount = userTokens.get(viewOwner);
+    if (userTokenCount == 1) {
+      // Remove the user from collection, when there are no more tokens issued to the user.
+      userTokens.remove(viewOwner);
+    } else {
+      userTokens.put(viewOwner, userTokenCount - 1);
+    }
+    logger.debug("Released view expansion token issued for user '{}'", viewOwner);
+  }
+
+  /**
+   * Represents token issued to a view owner for expanding the view.
+   */
+  public class ViewExpansionToken {
+    private final String viewOwner;
+
+    private boolean released;
+
+    ViewExpansionToken(String viewOwner) {
+      this.viewOwner = viewOwner;
+    }
+
+    /**
+     * Get schema tree for view owner who owns this token.
+     * @return Root of schema tree.
+     */
+    public SchemaPlus getSchemaTree() {
+      Preconditions.checkState(!released, "Trying to use released token.");
+      return queryContext.getRootSchema(viewOwner);
+    }
+
+    /**
+     * Release the token. Once released all method calls (except release) cause {@link java.lang.IllegalStateException}.
+     */
+    public void release() {
+      if (!released) {
+        released = true;
+        releaseViewExpansionToken(this);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index b1a71a5..9e60f21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.eigenbase.rel.RelFieldCollation.Direction;
@@ -69,9 +70,11 @@ public class BasicOptimizer extends Optimizer {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
 
   private final QueryContext queryContext;
+  private final UserClientConnection userSession;
 
-  public BasicOptimizer(final QueryContext queryContext) {
+  public BasicOptimizer(final QueryContext queryContext, final UserClientConnection userSession) {
     this.queryContext = queryContext;
+    this.userSession = userSession;
   }
 
   @Override
@@ -208,7 +211,8 @@ public class BasicOptimizer extends Optimizer {
       }
       try {
         final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
-        return storagePlugin.getPhysicalScan(scan.getSelection());
+        final String user = userSession.getSession().getCredentials().getUserName();
+        return storagePlugin.getPhysicalScan(user, scan.getSelection());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index defb4e4..c7b0e7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -25,11 +25,26 @@ import com.google.common.base.Preconditions;
 public abstract class AbstractBase implements PhysicalOperator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
 
+  private final String userName;
+
   protected long initialAllocation = 1000000L;
   protected long maxAllocation = 10000000000L;
   private int id;
   private double cost;
 
+  public AbstractBase() {
+    userName = null;
+  }
+
+  public AbstractBase(String userName) {
+    this.userName = userName;
+  }
+
+  public AbstractBase(AbstractBase that) {
+    Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+    this.userName = that.userName;
+  }
+
   @Override
   public void accept(GraphVisitor<PhysicalOperator> visitor) {
     visitor.enter(this);
@@ -48,6 +63,7 @@ public abstract class AbstractBase implements PhysicalOperator{
     return true;
   }
 
+  @Override
   public final void setOperatorId(int id) {
     this.id = id;
   }
@@ -80,4 +96,8 @@ public abstract class AbstractBase implements PhysicalOperator{
     return maxAllocation;
   }
 
+  @Override
+  public String getUserName() {
+    return userName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
index ee809fc..606aa4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -24,6 +24,13 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
 
+  public AbstractFileGroupScan(String userName) {
+    super(userName);
+  }
+
+  public AbstractFileGroupScan(AbstractFileGroupScan that) {
+    super(that);
+  }
 
   @Override
   public void modifyFileSelection(FileSelection selection) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 8fe21e6..242bd5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -30,6 +30,14 @@ import org.apache.drill.exec.physical.EndpointAffinity;
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
 
+  public AbstractGroupScan(String userName) {
+    super(userName);
+  }
+
+  public AbstractGroupScan(AbstractGroupScan that) {
+    super(that);
+  }
+
   @Override
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index a36a46e..5ec5698 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -29,6 +29,10 @@ import com.google.common.collect.Iterators;
 public abstract class AbstractSubScan extends AbstractBase implements SubScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
 
+  public AbstractSubScan(String userName) {
+    super(userName);
+  }
+
   @Override
   public boolean isExecutable() {
     return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index a5518ca..b1954ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -95,6 +95,14 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
   @JsonProperty("cost")
   public double getCost();
 
+  /**
+   * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
+   * PhysicalOperator. Default value is "null" in which case we impersonate as user who launched the query.
+   * @return
+   */
+  @JsonProperty("userName")
+  public String getUserName();
+
   @JsonIgnore
   public int getOperatorType();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index e25f1c0..912dfd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,12 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -32,65 +33,112 @@ import org.apache.drill.exec.util.AssertionUtil;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
- * Implementation of the physical operator visitor
+ * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree.
  */
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
+public class ImplCreator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
-  private RootExec root = null;
+  private static final ImplCreator INSTANCE = new ImplCreator();
 
   private ImplCreator() {}
 
-  private RootExec getRoot() {
-    return root;
+  /**
+   * Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children
+   * (which may contain child RecordBatches and so on).
+   * @param context FragmentContext.
+   * @param root FragmentRoot.
+   * @return RootExec of fragment.
+   * @throws ExecutionSetupException
+   */
+  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+    Preconditions.checkNotNull(root);
+    Preconditions.checkNotNull(context);
+
+    if (AssertionUtil.isAssertionsEnabled()) {
+      root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+    }
+
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    final RootExec rootExec = INSTANCE.getRootExec(root, context);
+    logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
+    if (rootExec == null) {
+      throw new ExecutionSetupException(
+          "The provided fragment did not have a root node that correctly created a RootExec value.");
+    }
+
+    return rootExec;
   }
 
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    Preconditions.checkNotNull(op);
-    Preconditions.checkNotNull(context);
+  /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
+  private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+    final List<RecordBatch> childRecordBatches = getChildren(root, context);
 
-    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(op.getClass());
-    if (opCreator != null) {
-      if (op instanceof FragmentRoot ) {
-        root = ((RootCreator<PhysicalOperator>)opCreator).getRoot(context, op, getChildren(op, context));
-        return null;
-      } else {
-        return ((BatchCreator<PhysicalOperator>)opCreator).getBatch(context, op, getChildren(op, context));
+    if (context.isImpersonationEnabled()) {
+      final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName());
+      try {
+        return proxyUgi.doAs(new PrivilegedExceptionAction<RootExec>() {
+          public RootExec run() throws Exception {
+            return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
+          }
+        });
+      } catch (InterruptedException | IOException e) {
+        final String errMsg = String.format("Failed to create RootExec for operator with id '%d'", root.getOperatorId());
+        logger.error(errMsg, e);
+        throw new ExecutionSetupException(errMsg, e);
       }
     } else {
-      throw new UnsupportedOperationException(String.format(
-          "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.",
-          this.getClass().getCanonicalName(), op.getClass().getCanonicalName()));
+      return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
     }
   }
 
-  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    List<RecordBatch> children = Lists.newArrayList();
-    for (PhysicalOperator child : op) {
-      children.add(child.accept(this, context));
+  /** Create a RecordBatch and its children for given PhysicalOperator */
+  private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkNotNull(op);
+
+    final List<RecordBatch> childRecordBatches = getChildren(op, context);
+
+    if (context.isImpersonationEnabled()) {
+      final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName());
+      try {
+        return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
+          public RecordBatch run() throws Exception {
+            return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+          }
+        });
+      } catch (InterruptedException | IOException e) {
+        final String errMsg = String.format("Failed to create RecordBatch for operator with id '%d'", op.getOperatorId());
+        logger.error(errMsg, e);
+        throw new ExecutionSetupException(errMsg, e);
+      }
+    } else {
+      return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
     }
-    return children;
   }
 
-  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
-    ImplCreator i = new ImplCreator();
-    if (AssertionUtil.isAssertionsEnabled()) {
-      root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+  /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
+  private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    final Class opClass = op.getClass();
+    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+    if (opCreator == null) {
+      throw new UnsupportedOperationException(
+          String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
     }
 
-    Stopwatch watch = new Stopwatch();
-    watch.start();
-    root.accept(i, context);
-    logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
-    if (i.root == null) {
-      throw new ExecutionSetupException(
-          "The provided fragment did not have a root node that correctly created a RootExec value.");
-    }
-    return i.getRoot();
+    return opCreator;
   }
 
-}
+  /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
+  private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    List<RecordBatch> children = Lists.newArrayList();
+    for (PhysicalOperator child : op) {
+      children.add(getRecordBatch(child, context));
+    }
+
+    return children;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index c8f872e..5451ca0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -28,29 +28,48 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptTable;
 
 public abstract class DrillTable implements Table {
 
   private final String storageEngineName;
-  public final StoragePluginConfig storageEngineConfig;
-  private Object selection;
-  private StoragePlugin plugin;
+  private final StoragePluginConfig storageEngineConfig;
+  private final Object selection;
+  private final StoragePlugin plugin;
+  private final String userName;
+
   private GroupScan scan;
 
-  /** Creates a DrillTable. */
-  public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+  /**
+   * Creates a DrillTable instance.
+   * @param storageEngineName StorageEngine name.
+   * @param plugin Reference to StoragePlugin.
+   * @param userName Whom to impersonate while reading the contents of the table.
+   * @param selection Table contents (type and contents depend on type of StoragePlugin).
+   */
+  public DrillTable(String storageEngineName, StoragePlugin plugin, String userName, Object selection) {
     this.selection = selection;
     this.plugin = plugin;
 
     this.storageEngineConfig = plugin.getConfig();
     this.storageEngineName = storageEngineName;
+    this.userName = userName;
+  }
+
+  /**
+   * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+   * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+   * we can remove this constructor.
+   */
+  public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+    this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
   }
 
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
-      this.scan = plugin.getPhysicalScan(new JSONOptions(selection));
+      this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
     }
     return scan;
   }
@@ -94,6 +113,7 @@ public abstract class DrillTable implements Table {
     result = prime * result + ((selection == null) ? 0 : selection.hashCode());
     result = prime * result + ((storageEngineConfig == null) ? 0 : storageEngineConfig.hashCode());
     result = prime * result + ((storageEngineName == null) ? 0 : storageEngineName.hashCode());
+    result = prime * result + ((userName == null) ? 0 : userName.hashCode());
     return result;
   }
 
@@ -130,6 +150,13 @@ public abstract class DrillTable implements Table {
     } else if (!storageEngineName.equals(other.storageEngineName)) {
       return false;
     }
+    if (userName == null) {
+      if (other.userName != null) {
+        return false;
+      }
+    } else if (!userName.equals(other.userName)) {
+      return false;
+    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
index 68e666a..9c5a94f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import java.util.List;
-
 import net.hydromatic.optiq.Schema.TableType;
 import net.hydromatic.optiq.Statistic;
 import net.hydromatic.optiq.Statistics;
 import net.hydromatic.optiq.TranslatableTable;
 
 import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptTable;
 import org.eigenbase.relopt.RelOptTable.ToRelContext;
@@ -36,9 +36,13 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillViewTable.class);
 
   private final View view;
+  private final String viewOwner;
+  private final ViewExpansionContext viewExpansionContext;
 
-  public DrillViewTable(List<String> path, View view){
+  public DrillViewTable(View view, String viewOwner, ViewExpansionContext viewExpansionContext){
     this.view = view;
+    this.viewOwner = viewOwner;
+    this.viewExpansionContext = viewExpansionContext;
   }
 
   @Override
@@ -53,15 +57,28 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
 
   @Override
   public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
-    RelDataType rowType = relOptTable.getRowType();
-    RelNode rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+    ViewExpansionToken token = null;
+    try {
+      RelDataType rowType = relOptTable.getRowType();
+      RelNode rel;
+
+      if (viewExpansionContext.isImpersonationEnabled()) {
+        token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+        rel = context.expandView(rowType, view.getSql(), token.getSchemaTree(), view.getWorkspaceSchemaPath());
+      } else {
+        rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+      }
+
+      // If the View's field list is not "*", create a cast.
+      if (!view.isDynamic() && !view.hasStar()) {
+        rel = RelOptUtil.createCastRel(rel, rowType, true);
+      }
 
-    if (view.isDynamic() || view.hasStar()){
-      // if View's field has "*", return rel directly.
       return rel;
-    }else{
-      // if the View's field list is not "*", try to create a cast.
-      return RelOptUtil.createCastRel(rel, rowType, true);
+    } finally {
+      if (token != null) {
+        token.release();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
index 843db58..24917f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
@@ -28,6 +28,15 @@ public class DynamicDrillTable extends DrillTable{
 
   private RelDataTypeHolder holder = new RelDataTypeHolder();
 
+  public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, String userName, Object selection) {
+    super(storageEngineName, plugin, userName, selection);
+  }
+
+  /**
+   * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+   * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+   * we can remove this constructor.
+   */
   public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, Object selection) {
     super(storageEngineName, plugin, selection);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 097b7bb..d56f1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.security.AccessControlException;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.rules.ReduceExpressionsRule;
 import org.eigenbase.rel.rules.WindowedAggSplitterRule;
@@ -159,6 +160,8 @@ public class DrillSqlWorker {
     } catch(ValidationException e) {
       String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
       throw UserException.parseError(e).message(errorMessage).build();
+    } catch (AccessControlException e) {
+      throw UserException.permissionError(e).build();
     } catch (IOException | RelConversionException e) {
       throw new QueryInputException("Failure handling SQL.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index a17a604..3e990c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -58,8 +58,8 @@ public class CreateTableHandler extends DefaultSqlHandler {
       AbstractSchema drillSchema = getDrillSchema(schema);
 
       if (!drillSchema.isMutable()) {
-        return DirectPlan.createDirectPlan(context, false, String.format("Current schema '%s' is not a mutable schema. " +
-            "Can't create tables in this schema.", drillSchema.getFullSchemaName()));
+        return DirectPlan.createDirectPlan(context, false, String.format("Unable to create table. " +
+            "Schema [%s] is immutable. ", drillSchema.getFullSchemaName()));
       }
 
       final String newTblName = sqlCreateTable.getName();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
index a486369..5f9061a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.torel;
 import java.util.List;
 import java.util.Map;
 
+import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.prepare.Prepare;
 
 import org.apache.drill.common.expression.LogicalExpression;
@@ -114,6 +115,11 @@ public class ConversionContext implements ToRelContext {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
+    throw new UnsupportedOperationException();
+  }
+
   private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a745479..3dc7c14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -96,6 +96,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.HASH_JOIN_TABLE_FACTOR,
       ExecConstants.HASH_AGG_TABLE_FACTOR,
       ExecConstants.AVERAGE_FIELD_WIDTH,
+      ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
       QueryClassLoader.JAVA_COMPILER_DEBUG,


[4/4] drill git commit: DRILL-2514: Part2 - Add impersonation tests using Hadoop MiniDFSCluster.

Posted by ve...@apache.org.
DRILL-2514: Part2 - Add impersonation tests using Hadoop MiniDFSCluster.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2a484251
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2a484251
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2a484251

Branch: refs/heads/master
Commit: 2a484251be48b0443318626b1364044db5473124
Parents: 40c9040
Author: vkorukanti <ve...@gmail.com>
Authored: Tue Mar 24 15:12:01 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:01 2015 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  12 +
 .../java/org/apache/drill/BaseTestQuery.java    |  12 +
 .../impersonation/BaseTestImpersonation.java    |  89 ++++++
 .../TestImpersonationMetadata.java              | 300 ++++++++++++++++++
 .../impersonation/TestImpersonationQueries.java | 304 +++++++++++++++++++
 pom.xml                                         | 121 +++++++-
 6 files changed, 837 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f5313ca..82426ef 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -351,6 +351,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
     </dependency>
     <dependency>
@@ -363,6 +369,12 @@
       <artifactId>avro-mapred</artifactId>
       <version>1.7.7</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 2ff4de7..b02051b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -186,6 +186,7 @@ public class BaseTestQuery extends ExecTest {
    * @param properties
    */
   public static void updateClient(Properties properties) throws Exception {
+    Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
     if (client != null) {
       client.close();
       client = null;
@@ -194,6 +195,17 @@ public class BaseTestQuery extends ExecTest {
     client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties);
   }
 
+  /*
+   * Close the current <i>client</i> and open a new client for the given user. All tests executed
+   * after this method call use the new <i>client</i>.
+   * @param user
+   */
+  public static void updateClient(String user) throws Exception {
+    final Properties props = new Properties();
+    props.setProperty("user", user);
+    updateClient(props);
+  }
+
   protected static BufferAllocator getAllocator() {
     return allocator;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
new file mode 100644
index 0000000..274f5f7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Properties;
+
+public class BaseTestImpersonation extends PlanTestBase {
+  protected static final String processUser = System.getProperty("user.name");
+
+  protected static MiniDFSCluster dfsCluster;
+  protected static Configuration conf;
+  protected static String miniDfsStoragePath;
+
+  protected static void startMiniDfsCluster(String testClass) throws Exception {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(testClass), "Expected a non-null and non-empty test class name");
+    conf = new Configuration();
+
+    // Set the MiniDfs base dir to be the temp directory of the test, so that all files created within the MiniDfs
+    // are properly cleanup when test exits.
+    miniDfsStoragePath = System.getProperty("java.io.tmpdir") + Path.SEPARATOR + testClass;
+    conf.set("hdfs.minidfs.basedir", miniDfsStoragePath);
+
+    // Set the proxyuser settings so that the user who is running the Drillbits/MiniDfs can impersonate other users.
+    conf.set(String.format("hadoop.proxyuser.%s.hosts", processUser), "*");
+    conf.set(String.format("hadoop.proxyuser.%s.groups", processUser), "*");
+
+    // Start the MiniDfs cluster
+    dfsCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3)
+        .format(true)
+        .build();
+
+    final Properties props = cloneDefaultTestConfigProperties();
+    props.setProperty(ExecConstants.IMPERSONATION_ENABLED, "true");
+
+    updateTestCluster(1, DrillConfig.create(props));
+  }
+
+  protected static void createAndAddWorkspace(FileSystem fs, String name, String path, short permissions, String owner,
+      String group,
+      Map<String, WorkspaceConfig> workspaces) throws Exception {
+    final Path dirPath = new Path(path);
+    FileSystem.mkdirs(fs, dirPath, new FsPermission(permissions));
+    fs.setOwner(dirPath, owner, group);
+    final WorkspaceConfig ws = new WorkspaceConfig(path, true, "parquet");
+    workspaces.put(name, ws);
+  }
+
+  protected static void stopMiniDfsCluster() throws Exception {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+
+    if (miniDfsStoragePath != null) {
+      FileUtils.deleteQuietly(new File(miniDfsStoragePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
new file mode 100644
index 0000000..411660f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests impersonation on metadata related queries as SHOW FILES, SHOW TABLES, CREATE VIEW and CREATE TABLE
+ */
+public class TestImpersonationMetadata extends BaseTestImpersonation {
+  private static final String MINIDFS_STORAGE_PLUGIN_NAME = "minidfs" + TestImpersonationMetadata.class.getSimpleName();
+
+  private static final String user1 = "drillTestUser1";
+  private static final String user2 = "drillTestUser2";
+
+  private static final String group0 = "drillTestGrp0";
+  private static final String group1 = "drillTestGrp1";
+
+  static {
+    UserGroupInformation.createUserForTesting(user1, new String[]{ group1, group0 });
+    UserGroupInformation.createUserForTesting(user2, new String[]{ group1 });
+  }
+
+  @BeforeClass
+  public static void addMiniDfsBasedStorage() throws Exception {
+    startMiniDfsCluster(TestImpersonationMetadata.class.getSimpleName());
+
+    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    final FileSystemConfig lfsPluginConfig = (FileSystemConfig) pluginRegistry.getPlugin("dfs").getConfig();
+
+    final FileSystemConfig miniDfsPluginConfig = new FileSystemConfig();
+    miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+    Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+
+    createTestWorkspaces(workspaces);
+
+    miniDfsPluginConfig.workspaces = workspaces;
+    miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
+    miniDfsPluginConfig.setEnabled(true);
+
+    pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+  }
+
+  private static void createTestWorkspaces(Map<String, WorkspaceConfig> workspaces) throws Exception {
+    // Create "/tmp" folder and set permissions to "777"
+    final FileSystem fs = dfsCluster.getFileSystem();
+    final Path tmpPath = new Path("/tmp");
+    fs.delete(tmpPath, true);
+    FileSystem.mkdirs(fs, tmpPath, new FsPermission((short)0777));
+
+    // Create /drillTestGrp0_700 directory with permissions 700 (owned by user running the tests)
+    createAndAddWorkspace(fs, "drillTestGrp0_700", "/drillTestGrp0_700", (short)0700, processUser, group0, workspaces);
+
+    // Create /drillTestGrp0_750 directory with permissions 750 (owned by user running the tests)
+    createAndAddWorkspace(fs, "drillTestGrp0_750", "/drillTestGrp0_750", (short)0750, processUser, group0, workspaces);
+
+    // Create /drillTestGrp0_755 directory with permissions 755 (owned by user running the tests)
+    createAndAddWorkspace(fs, "drillTestGrp0_755", "/drillTestGrp0_755", (short)0755, processUser, group0, workspaces);
+
+    // Create /drillTestGrp0_770 directory with permissions 770 (owned by user running the tests)
+    createAndAddWorkspace(fs, "drillTestGrp0_770", "/drillTestGrp0_770", (short)0770, processUser, group0, workspaces);
+
+    // Create /drillTestGrp0_777 directory with permissions 777 (owned by user running the tests)
+    createAndAddWorkspace(fs, "drillTestGrp0_777", "/drillTestGrp0_777", (short)0777, processUser, group0, workspaces);
+
+    // Create /drillTestGrp1_700 directory with permissions 700 (owned by user1)
+    createAndAddWorkspace(fs, "drillTestGrp1_700", "/drillTestGrp1_700", (short)0700, user1, group1, workspaces);
+  }
+
+  @Test
+  public void testShowFilesInWSWithUserAndGroupPermissionsForQueryUser() throws Exception {
+    updateClient(user1);
+
+    // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
+    test(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+
+    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for
+    // "user1"
+    test(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+  }
+
+  @Test
+  public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
+    updateClient(user2);
+    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part
+    // of the "group0"
+    test(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+  }
+
+  @Test
+  public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
+    UserRemoteException ex = null;
+
+    updateClient(user2);
+    try {
+      // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
+      test(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+    } catch(UserRemoteException e) {
+      ex = e;
+    }
+
+    assertNotNull("UserRemoteException is expected", ex);
+    assertThat(ex.getMessage(),
+        containsString("Permission denied: user=drillTestUser2, " +
+        "access=READ_EXECUTE, inode=\"/drillTestGrp1_700\":drillTestUser1:drillTestGrp1:drwx------"));
+  }
+
+  @Test
+  public void testShowSchemasSanityCheck() throws Exception {
+    test("SHOW SCHEMAS");
+  }
+
+  @Test
+  public void testCreateViewInDirWithUserPermissionsForQueryUser() throws Exception {
+    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp1_700"; // Workspace dir owned by "user1"
+    testCreateViewTestHelper(user1, viewSchema, "view1");
+  }
+
+  @Test
+  public void testCreateViewInDirWithGroupPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
+    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_770";
+    testCreateViewTestHelper(user1, viewSchema, "view1");
+  }
+
+  @Test
+  public void testCreateViewInDirWithOtherPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_777";
+    testCreateViewTestHelper(user2, viewSchema, "view1");
+  }
+
+  private static void testCreateViewTestHelper(String user, String viewSchema,
+      String viewName) throws Exception {
+    try {
+      updateClient(user);
+
+      test("USE " + viewSchema);
+
+      test("CREATE VIEW " + viewName + " AS SELECT " +
+          "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+      testBuilder()
+          .sqlQuery("SHOW TABLES")
+          .unOrdered()
+          .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
+          .baselineValues(viewSchema, viewName)
+          .go();
+
+      test("SHOW FILES");
+
+      testBuilder()
+          .sqlQuery("SELECT * FROM " + viewName + " LIMIT 1")
+          .ordered()
+          .baselineColumns("c_custkey", "c_nationkey")
+          .baselineValues(1, 15)
+          .go();
+
+    } finally {
+      test("DROP VIEW " + viewSchema + "." + viewName);
+    }
+  }
+
+  @Test
+  public void testCreateViewInWSWithNoPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+    final String viewSchema = MINIDFS_STORAGE_PLUGIN_NAME + ".drillTestGrp0_755";
+    final String viewName = "view1";
+
+    updateClient(user2);
+
+    test("USE " + viewSchema);
+
+    test("CREATE VIEW " + viewName + " AS SELECT " +
+        "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+    // SHOW TABLES is expected to return no records as view creation fails above.
+    testBuilder()
+        .sqlQuery("SHOW TABLES")
+        .expectsEmptyResultSet()
+        .go();
+
+    test("SHOW FILES");
+  }
+
+  @Test
+  public void testCreateTableInDirWithUserPermissionsForQueryUser() throws Exception {
+    final String tableWS = "drillTestGrp1_700"; // Workspace dir owned by "user1"
+    testCreateTableTestHelper(user1, tableWS, "table1");
+  }
+
+  @Test
+  public void testCreateTableInDirWithGroupPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user1" is part of "group0"
+    final String tableWS = "drillTestGrp0_770";
+    testCreateTableTestHelper(user1, tableWS, "table1");
+  }
+
+  @Test
+  public void testCreateTableInDirWithOtherPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+    final String tableWS = "drillTestGrp0_777";
+    testCreateTableTestHelper(user2, tableWS, "table1");
+  }
+
+  private static void testCreateTableTestHelper(String user, String tableWS,
+      String tableName) throws Exception {
+    try {
+      updateClient(user);
+
+      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+
+      test("CREATE TABLE " + tableName + " AS SELECT " +
+          "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+
+      test("SHOW FILES");
+
+      testBuilder()
+          .sqlQuery("SELECT * FROM " + tableName + " LIMIT 1")
+          .ordered()
+          .baselineColumns("c_custkey", "c_nationkey")
+          .baselineValues(1, 15)
+          .go();
+
+    } finally {
+      // There is no drop table, we need to delete the table directory through FileSystem object
+      final FileSystem fs = dfsCluster.getFileSystem();
+      final Path tablePath = new Path(Path.SEPARATOR + tableWS + Path.SEPARATOR + tableName);
+      if (fs.exists(tablePath)) {
+        fs.delete(tablePath, true);
+      }
+    }
+  }
+
+  @Test
+  public void testCreateTableInWSWithNoPermissionsForQueryUser() throws Exception {
+    // Workspace dir owned by "processUser", workspace group is "group0" and "user2" is not part of "group0"
+    final String tableWS = "drillTestGrp0_755";
+    final String tableName = "table1";
+
+    UserRemoteException ex = null;
+
+    try {
+      updateClient(user2);
+
+      test("USE " + Joiner.on(".").join(MINIDFS_STORAGE_PLUGIN_NAME, tableWS));
+
+      test("CREATE TABLE " + tableName + " AS SELECT " +
+          "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+    } catch(UserRemoteException e) {
+      ex = e;
+    }
+
+    assertNotNull("UserRemoteException is expected", ex);
+    assertThat(ex.getMessage(),
+        containsString("Permission denied: user=drillTestUser2, access=WRITE, inode=\"/drillTestGrp0_755\""));
+  }
+
+  @AfterClass
+  public static void removeMiniDfsBasedStorage() throws Exception {
+    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    stopMiniDfsCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
new file mode 100644
index 0000000..14392c9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.impersonation;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test queries involving direct impersonation and multilevel impersonation including join queries where each side is
+ * a nested view.
+ */
+public class TestImpersonationQueries extends BaseTestImpersonation {
+  private static final String MINIDFS_STORAGE_PLUGIN_NAME = "minidfs" + TestImpersonationQueries.class.getSimpleName();
+
+  private static final String[] org1Users = { "user0_1", "user1_1", "user2_1", "user3_1", "user4_1", "user5_1" };
+  private static final String[] org1Groups = { "group0_1", "group1_1", "group2_1", "group3_1", "group4_1", "group5_1" };
+  private static final String[] org2Users = { "user0_2", "user1_2", "user2_2", "user3_2", "user4_2", "user5_2" };
+  private static final String[] org2Groups = { "group0_2", "group1_2", "group2_2", "group3_2", "group4_2", "group5_2" };
+
+  static {
+    // "user0_1" belongs to "groups0_1". From "user1_1" onwards each user belongs to corresponding group and the group
+    // before it, i.e "user1_1" belongs to "group1_1" and "group0_1" and so on.
+    UserGroupInformation.createUserForTesting(org1Users[0], new String[] { org1Groups[0] });
+    for(int i=1; i<org1Users.length; i++) {
+      UserGroupInformation.createUserForTesting(org1Users[i], new String[] { org1Groups[i], org1Groups[i-1] });
+    }
+
+    UserGroupInformation.createUserForTesting(org2Users[0], new String[] { org2Groups[0] });
+    for(int i=1; i<org2Users.length; i++) {
+      UserGroupInformation.createUserForTesting(org2Users[i], new String[] { org2Groups[i], org2Groups[i-1] });
+    }
+  }
+
+  @BeforeClass
+  public static void addMiniDfsBasedStorageAndGenerateTestData() throws Exception {
+    startMiniDfsCluster(TestImpersonationQueries.class.getSimpleName());
+
+    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    final FileSystemConfig lfsPluginConfig = (FileSystemConfig) pluginRegistry.getPlugin("dfs").getConfig();
+
+    final FileSystemConfig miniDfsPluginConfig = new FileSystemConfig();
+    miniDfsPluginConfig.connection = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+    Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(lfsPluginConfig.workspaces);
+
+    createTestWorkspaces(workspaces);
+
+    miniDfsPluginConfig.workspaces = workspaces;
+    miniDfsPluginConfig.formats = ImmutableMap.copyOf(lfsPluginConfig.formats);
+    miniDfsPluginConfig.setEnabled(true);
+
+    pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true);
+
+    // Create test tables/views
+
+    // Create copy of "lineitem" table in /user/user0_1 owned by user0_1:group0_1 with permissions 750. Only user0_1
+    // has access to data in created "lineitem" table.
+    createTestTable(org1Users[0], org1Groups[0], "lineitem");
+
+    // Create copy of "orders" table in /user/user0_2 owned by user0_2:group0_2 with permissions 750. Only user0_2
+    // has access to data in created "orders" table.
+    createTestTable(org2Users[0], org2Groups[0], "orders");
+
+    createNestedTestViewsOnLineItem();
+    createNestedTestViewsOnOrders();
+  }
+
+  private static String getUserHome(String user) {
+    return "/user/" + user;
+  }
+
+  // Return the user workspace for given user.
+  private static String getWSSchema(String user) {
+    return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+  }
+
+  private static void createTestWorkspaces(Map<String, WorkspaceConfig> workspaces) throws Exception {
+    // Create "/tmp" folder and set permissions to "777"
+    final FileSystem fs = dfsCluster.getFileSystem();
+    final Path tmpPath = new Path("/tmp");
+    fs.delete(tmpPath, true);
+    FileSystem.mkdirs(fs, tmpPath, new FsPermission((short)0777));
+
+    // create user directory (ex. "/user/user0_1", with ownership "user0_1:group0_1" and perms 755) for every user.
+    for(int i=0; i<org1Users.length; i++) {
+      final String user = org1Users[i];
+      final String group = org1Groups[i];
+      createAndAddWorkspace(fs, user, getUserHome(user), (short)0755, user, group, workspaces);
+    }
+
+    // create user directory (ex. "/user/user0_2", with ownership "user0_2:group0_2" and perms 755) for every user.
+    for(int i=0; i<org2Users.length; i++) {
+      final String user = org2Users[i];
+      final String group = org2Groups[i];
+      createAndAddWorkspace(fs, user, getUserHome(user), (short)0755, user, group, workspaces);
+    }
+  }
+
+  private static void createTestTable(String user, String group, String tableName) throws Exception {
+    updateClient(user);
+    test("USE " + getWSSchema(user));
+    test(String.format("CREATE TABLE %s as SELECT * FROM cp.`tpch/%s.parquet`;", tableName, tableName));
+
+    // Change the ownership and permissions manually. Currently there is no option to specify the default permissions
+    // and ownership for new tables.
+    final Path tablePath = new Path(getUserHome(user), tableName);
+    final FileSystem fs = dfsCluster.getFileSystem();
+
+    fs.setOwner(tablePath, user, group);
+    fs.setPermission(tablePath, new FsPermission((short)0750));
+  }
+
+  private static void createNestedTestViewsOnLineItem() throws Exception {
+    // Input table "lineitem"
+    // /user/user0_1     lineitem      750    user0_1:group0_1
+
+    // Create a view on top of lineitem table
+    // /user/user1_1    u1_lineitem    750    user1_1:group1_1
+    createView(org1Users[1], org1Groups[1], (short)0750, "u1_lineitem", getWSSchema(org1Users[0]), "lineitem");
+
+    // Create a view on top of u1_lineitem view
+    // /user/user2_1    u2_lineitem    750    user2_1:group2_1
+    createView(org1Users[2], org1Groups[2], (short)0750, "u2_lineitem", getWSSchema(org1Users[1]), "u1_lineitem");
+
+    // Create a view on top of u2_lineitem view
+    // /user/user2_1    u22_lineitem    750    user2_1:group2_1
+    createView(org1Users[2], org1Groups[2], (short)0750, "u22_lineitem", getWSSchema(org1Users[2]), "u2_lineitem");
+
+    // Create a view on top of u22_lineitem view
+    // /user/user3_1    u3_lineitem    750    user3_1:group3_1
+    createView(org1Users[3], org1Groups[3], (short)0750, "u3_lineitem", getWSSchema(org1Users[2]), "u22_lineitem");
+
+    // Create a view on top of u3_lineitem view
+    // /user/user4_1    u4_lineitem    755    user4_1:group4_1
+    createView(org1Users[4], org1Groups[4], (short)0755, "u4_lineitem", getWSSchema(org1Users[3]), "u3_lineitem");
+  }
+
+  private static void createNestedTestViewsOnOrders() throws Exception {
+    // Input table "orders"
+    // /user/user0_2     orders      750    user0_2:group0_2
+
+    // Create a view on top of orders table
+    // /user/user1_2    u1_orders    750    user1_2:group1_2
+    createView(org2Users[1], org2Groups[1], (short)0750, "u1_orders", getWSSchema(org2Users[0]), "orders");
+
+    // Create a view on top of u1_orders view
+    // /user/user2_2    u2_orders    750    user2_2:group2_2
+    createView(org2Users[2], org2Groups[2], (short)0750, "u2_orders", getWSSchema(org2Users[1]), "u1_orders");
+
+    // Create a view on top of u2_orders view
+    // /user/user2_2    u22_orders    750    user2_2:group2_2
+    createView(org2Users[2], org2Groups[2], (short)0750, "u22_orders", getWSSchema(org2Users[2]), "u2_orders");
+
+    // Create a view on top of u22_orders view (permissions of this view (755) are different from permissions of the
+    // corresponding view in "lineitem" nested views to have a join query of "lineitem" and "orders" nested views
+    // without exceeding the maximum number of allowed user hops in chained impersonation.
+    // /user/user3_2    u3_orders    750    user3_2:group3_2
+    createView(org2Users[3], org2Groups[3], (short)0755, "u3_orders", getWSSchema(org2Users[2]), "u22_orders");
+
+    // Create a view on top of u3_orders view
+    // /user/user4_2    u4_orders    755    user4_2:group4_2
+    createView(org2Users[4], org2Groups[4], (short)0755, "u4_orders", getWSSchema(org2Users[3]), "u3_orders");
+  }
+
+  private static void createView(final String viewOwner, final String viewGroup, final short viewPerms,
+      final String newViewName, final String fromSourceSchema, final String fromSourceTableName) throws Exception {
+    updateClient(viewOwner);
+    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY, viewPerms));
+    test(String.format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s;",
+        getWSSchema(viewOwner), newViewName, fromSourceSchema, fromSourceTableName));
+
+    // Verify the view file created has the expected permissions and ownership
+    Path viewFilePath = new Path(getUserHome(viewOwner), newViewName + DotDrillType.VIEW.getEnding());
+    FileStatus status = dfsCluster.getFileSystem().getFileStatus(viewFilePath);
+    assertEquals(viewGroup, status.getGroup());
+    assertEquals(viewOwner, status.getOwner());
+    assertEquals(viewPerms, status.getPermission().toShort());
+  }
+
+  @Test
+  public void testDirectImpersonation_HasUserReadPermissions() throws Exception {
+    // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Try to read the table as "user0_1". We
+    // shouldn't expect any errors.
+    updateClient(org1Users[0]);
+    test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+  }
+
+  @Test
+  public void testDirectImpersonation_HasGroupReadPermissions() throws Exception {
+    // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Try to read the table as "user1_1". We
+    // shouldn't expect any errors as "user1_1" is part of the "group0_1"
+    updateClient(org1Users[1]);
+    test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+  }
+
+  @Test
+  public void testDirectImpersonation_NoReadPermissions() throws Exception {
+    UserRemoteException ex = null;
+    try {
+      // Table lineitem is owned by "user0_1:group0_1" with permissions 750. Now try to read the table as "user2_1". We
+      // should expect a permission denied error as "user2_1" is not part of the "group0_1"
+      updateClient(org1Users[2]);
+      test(String.format("SELECT * FROM %s.lineitem ORDER BY l_orderkey LIMIT 1", getWSSchema(org1Users[0])));
+    } catch(UserRemoteException e) {
+      ex = e;
+    }
+
+    assertNotNull("UserRemoteException is expected", ex);
+    assertThat(ex.getMessage(), containsString("PERMISSION ERROR: " +
+            "Not authorized to read table [lineitem] in schema [minidfsTestImpersonationQueries.user0_1]"));
+  }
+
+
+  @Test
+  public void testMultiLevelImpersonationEqualToMaxUserHops() throws Exception {
+    updateClient(org1Users[4]);
+    test(String.format("SELECT * from %s.u4_lineitem LIMIT 1;", getWSSchema(org1Users[4])));
+  }
+
+  @Test
+  public void testMultiLevelImpersonationExceedsMaxUserHops() throws Exception {
+    UserRemoteException ex = null;
+
+    try {
+      updateClient(org1Users[5]);
+      test(String.format("SELECT * from %s.u4_lineitem LIMIT 1;", getWSSchema(org1Users[4])));
+    } catch(UserRemoteException e) {
+      ex = e;
+    }
+
+    assertNotNull("UserRemoteException is expected", ex);
+    assertThat(ex.getMessage(),
+        containsString("Cannot issue token for view expansion as issuing the token exceeds the maximum allowed number " +
+            "of user hops (3) in chained impersonation"));
+  }
+
+  @Test
+  public void testMultiLevelImpersonationJoinEachSideReachesMaxUserHops() throws Exception {
+    updateClient(org1Users[4]);
+    test(String.format("SELECT * from %s.u4_lineitem l JOIN %s.u3_orders o ON l.l_orderkey = o.o_orderkey LIMIT 1;",
+        getWSSchema(org1Users[4]), getWSSchema(org2Users[3])));
+  }
+
+  @Test
+  public void testMultiLevelImpersonationJoinOneSideExceedsMaxUserHops() throws Exception {
+    UserRemoteException ex = null;
+
+    try {
+      updateClient(org1Users[4]);
+      test(String.format("SELECT * from %s.u4_lineitem l JOIN %s.u4_orders o ON l.l_orderkey = o.o_orderkey LIMIT 1;",
+          getWSSchema(org1Users[4]), getWSSchema(org2Users[4])));
+    } catch(UserRemoteException e) {
+      ex = e;
+    }
+
+    assertNotNull("UserRemoteException is expected", ex);
+    assertThat(ex.getMessage(),
+        containsString("Cannot issue token for view expansion as issuing the token exceeds the maximum allowed number " +
+            "of user hops (3) in chained impersonation"));
+  }
+
+  @AfterClass
+  public static void removeMiniDfsBasedStorage() throws Exception {
+    getDrillbitContext().getStorage().deletePlugin(MINIDFS_STORAGE_PLUGIN_NAME);
+    stopMiniDfsCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2a484251/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 91707fa..558646e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
     <target.gen.source.path>${project.basedir}/target/generated-sources</target.gen.source.path>
     <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
     <dep.junit.version>4.11</dep.junit.version>
-    <dep.slf4j.version>1.7.5</dep.slf4j.version>
+    <dep.slf4j.version>1.7.6</dep.slf4j.version>
     <parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
   </properties>
 
@@ -705,6 +705,99 @@
           </dependency>
           <dependency>
             <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.4.1</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+            <exclusions>
+              <exclusion>
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>servlet-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>servlet-api-2.5</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>servlet-api-2.5</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>javax.servlet</groupId>
+                <artifactId>servlet-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>jetty-util</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jets3t</artifactId>
+                <groupId>net.java.dev.jets3t</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>log4j</artifactId>
+                <groupId>log4j</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>slf4j-log4j12</artifactId>
+                <groupId>org.slf4j</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>mockito-all</artifactId>
+                <groupId>org.mockito</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging-api</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-core</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-server</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-json</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-client</artifactId>
+              </exclusion>
+              <exclusion>
+                <artifactId>core</artifactId>
+                <groupId>org.eclipse.jdt</groupId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-core-asl</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-mapper-asl</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-xc</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-jaxrs</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <version>2.4.1</version>
             <exclusions>
@@ -999,6 +1092,32 @@
               </exclusion>
             </exclusions>
           </dependency>
+          <!-- Test Dependencies -->
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>2.4.1</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+            <exclusions>
+              <exclusion>
+                <groupId>commons-logging</groupId>
+                <artifactId>commons-logging</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>servlet-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>javax.servlet</groupId>
+                <artifactId>servlet-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>log4j</groupId>
+                <artifactId>log4j</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>


[3/4] drill git commit: DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas

Posted by ve...@apache.org.
DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/117b7497
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/117b7497
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/117b7497

Branch: refs/heads/master
Commit: 117b749744d1775816fc5f9591dced3aae551352
Parents: fbb405b
Author: vkorukanti <ve...@gmail.com>
Authored: Tue Mar 10 10:19:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseSchemaFactory.java    |   2 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |   2 +-
 .../exec/store/hive/HiveStoragePlugin.java      |   2 +-
 .../store/hive/schema/HiveSchemaFactory.java    |   3 +-
 .../exec/store/mongo/MongoStoragePlugin.java    |   2 +-
 .../store/mongo/schema/MongoSchemaFactory.java  |   4 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  43 ++++++-
 .../org/apache/drill/exec/ops/QueryContext.java |  15 ++-
 .../apache/drill/exec/store/SchemaFactory.java  |   4 +-
 .../drill/exec/store/StoragePluginRegistry.java |   2 +-
 .../drill/exec/store/avro/AvroFormatPlugin.java |   9 +-
 .../exec/store/dfs/BasicFormatMatcher.java      |  25 ++--
 .../drill/exec/store/dfs/DrillFileSystem.java   |   6 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |  34 +++---
 .../exec/store/dfs/FileSystemSchemaFactory.java |  12 +-
 .../drill/exec/store/dfs/FormatCreator.java     |  17 +--
 .../drill/exec/store/dfs/FormatMatcher.java     |   2 +-
 .../drill/exec/store/dfs/FormatPlugin.java      |   3 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  | 115 ++++++++++---------
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  27 +++--
 .../exec/store/dfs/easy/EasyGroupScan.java      |   8 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |  15 ++-
 .../exec/store/easy/text/TextFormatPlugin.java  |  18 ++-
 .../store/ischema/InfoSchemaStoragePlugin.java  |   2 +-
 .../exec/store/mock/MockStorageEngine.java      |   2 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  76 ++++++------
 .../exec/store/parquet/ParquetGroupScan.java    |  17 +--
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   2 +-
 .../exec/store/text/DrillTextRecordReader.java  |   6 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   2 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  11 +-
 .../work/fragment/NonRootFragmentManager.java   |   2 +-
 .../exec/store/dfs/TestDrillFileSystem.java     |   2 +-
 34 files changed, 279 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7a0a64b..1c407e1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     HBaseSchema schema = new HBaseSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index c10b0ab..948d462 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -66,7 +66,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index f4baf3b..91e7a92 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -82,7 +82,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 0e16e6f..587e90d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.hive.schema;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -186,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     HiveSchema schema = new HiveSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index e46d8ec..dfad5ef 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -63,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 3c70638..f650ccc 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mongo.schema;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,7 +37,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoCnxnManager;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     MongoSchema schema = new MongoSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9400355..7dfd0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
@@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
-  private final UserClientConnection connection; // is null if attached to non-root fragment
+  private final UserClientConnection connection; // is null if this context is for non-root fragment
+  private final QueryContext queryContext; // is null if this context is for non-root fragment
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
   private final BufferAllocator allocator;
@@ -87,10 +87,34 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
   private final AccountingUserConnection accountingUserConnection;
 
+  /**
+   * Create a FragmentContext instance for non-root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
   public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, null, funcRegistry);
+  }
+
+  /**
+   * Create a FragmentContext instance for root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param queryContext QueryContext.
+   * @param connection UserClientConnection.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
+  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
       final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
     throws ExecutionSetupException {
     this.context = dbContext;
+    this.queryContext = queryContext;
     this.connection = connection;
     this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
     this.fragment = fragment;
@@ -128,6 +152,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     bufferManager = new BufferManager(this.allocator, this);
   }
 
+  /**
+   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+   * the long list of test files.
+   */
+  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, connection, funcRegistry);
+  }
+
   public OptionManager getOptions() {
     return fragmentOptions;
   }
@@ -162,15 +195,13 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   }
 
   public SchemaPlus getRootSchema() {
-    if (connection == null) {
+    if (queryContext == null) {
       fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
           "This is a non-root fragment."));
       return null;
     }
 
-    final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
-    context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
-    return root;
+    return queryContext.getRootSchema();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2dcac25..cd5c054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import io.netty.buffer.DrillBuf;
@@ -46,7 +47,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
 public class QueryContext implements AutoCloseable, UdfUtilities {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private final DrillbitContext drillbitContext;
   private final UserSession session;
@@ -113,9 +114,15 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   }
 
   public SchemaPlus getRootSchema() {
-    final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
-    drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
-    return rootSchema;
+    try {
+      final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+      drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+      return rootSchema;
+    } catch(IOException e) {
+      final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
   }
 
   public OptionManager getOptions() {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index feadabd..14d2fab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -21,8 +21,10 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.exec.rpc.user.UserSession;
 
+import java.io.IOException;
+
 public interface SchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
-  public void registerSchemas(UserSession session, SchemaPlus parent);
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 5d0eed6..cb9ee0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -301,7 +301,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   public class DrillSchemaFactory implements SchemaFactory {
 
     @Override
-    public void registerSchemas(UserSession session, SchemaPlus parent) {
+    public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
       Stopwatch watch = new Stopwatch();
       watch.start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 2f487d6..30c45fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
 import java.util.List;
@@ -40,13 +41,13 @@ import java.util.List;
  */
 public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
-  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
                           StoragePluginConfig storagePluginConfig) {
-    this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+    this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
   }
 
-  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 9756f3c..3768aea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -36,34 +37,32 @@ import com.google.common.collect.Range;
 public class BasicFormatMatcher extends FormatMatcher{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
 
-  private final List<Pattern> patterns;
-  private final MagicStringMatcher matcher;
-  protected final DrillFileSystem fs;
   protected final FormatPlugin plugin;
   protected final boolean compressible;
   protected final CompressionCodecFactory codecFactory;
 
-  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
+  private final List<Pattern> patterns;
+  private final MagicStringMatcher matcher;
+
+  public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
     super();
     this.patterns = ImmutableList.copyOf(patterns);
     this.matcher = new MagicStringMatcher(magicStrings);
-    this.fs = fs;
     this.plugin = plugin;
     this.compressible = false;
     this.codecFactory = null;
   }
 
-  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) {
+  public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
     List<Pattern> patterns = Lists.newArrayList();
     for (String extension : extensions) {
       patterns.add(Pattern.compile(".*\\." + extension));
     }
     this.patterns = patterns;
     this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
-    this.fs = fs;
     this.plugin = plugin;
     this.compressible = compressible;
-    this.codecFactory = new CompressionCodecFactory(fs.getConf());
+    this.codecFactory = new CompressionCodecFactory(fsConf);
   }
 
   @Override
@@ -72,8 +71,8 @@ public class BasicFormatMatcher extends FormatMatcher{
   }
 
   @Override
-  public FormatSelection isReadable(FileSelection selection) throws IOException {
-    if (isReadable(selection.getFirstPath(fs))) {
+  public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
+    if (isReadable(fs, selection.getFirstPath(fs))) {
       if (plugin.getName() != null) {
         NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
         namedConfig.name = plugin.getName();
@@ -85,7 +84,7 @@ public class BasicFormatMatcher extends FormatMatcher{
     return null;
   }
 
-  protected final boolean isReadable(FileStatus status) throws IOException {
+  protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException {
     CompressionCodec codec = null;
     if (compressible) {
       codec = codecFactory.getCodec(status.getPath());
@@ -103,7 +102,7 @@ public class BasicFormatMatcher extends FormatMatcher{
       }
     }
 
-    if (matcher.matches(status)) {
+    if (matcher.matches(fs, status)) {
       return true;
     }
     return false;
@@ -128,7 +127,7 @@ public class BasicFormatMatcher extends FormatMatcher{
       }
     }
 
-    public boolean matches(FileStatus status) throws IOException{
+    public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
       if (ranges.isEmpty()) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 2683cca..f8afe3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -96,11 +96,11 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   private final OperatorStats operatorStats;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
-    this(FileSystem.get(fsConf), null);
+    this(fsConf, null);
   }
 
-  public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
-    this.underlyingFs = fs;
+  public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
+    this.underlyingFs = FileSystem.get(fsConf);
     this.operatorStats = operatorStats;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index c5ca41b..775b402 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.apache.drill.exec.store.dfs.FileSystemSchemaFactory.DEFAULT_WS_NAME;
+
 /**
  * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
  * LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem.
@@ -51,26 +53,26 @@ import com.google.common.collect.Maps;
  * references to the FileSystem configuration and path management.
  */
 public class FileSystemPlugin extends AbstractStoragePlugin{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
 
   private final FileSystemSchemaFactory schemaFactory;
-  private Map<String, FormatPlugin> formatPluginsByName;
-  private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
-  private FileSystemConfig config;
-  private DrillbitContext context;
-  private final DrillFileSystem fs;
+  private final Map<String, FormatPlugin> formatPluginsByName;
+  private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
+  private final FileSystemConfig config;
+  private final DrillbitContext context;
+  private final Configuration fsConf;
 
   public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{
     try {
       this.config = config;
       this.context = context;
 
-      Configuration fsConf = new Configuration();
+      fsConf = new Configuration();
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
-      fs = new DrillFileSystem(fsConf);
-      formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
+
+      formatPluginsByName = FormatCreator.getFormatPlugins(context, fsConf, config);
       List<FormatMatcher> matchers = Lists.newArrayList();
       formatPluginsByConfig = Maps.newHashMap();
       for (FormatPlugin p : formatPluginsByName.values()) {
@@ -78,17 +80,17 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
         formatPluginsByConfig.put(p.getConfig(), p);
       }
 
-      boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
+      final boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
       List<WorkspaceSchemaFactory> factories = Lists.newArrayList();
       if (!noWorkspace) {
         for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) {
-          factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, fs, space.getValue(), matchers));
+          factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, space.getValue(), matchers));
         }
       }
 
       // if the "default" workspace is not given add one.
-      if (noWorkspace || !config.workspaces.containsKey("default")) {
-        factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers));
+      if (noWorkspace || !config.workspaces.containsKey(DEFAULT_WS_NAME)) {
+        factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers));
       }
 
       this.schemaFactory = new FileSystemSchemaFactory(name, factories);
@@ -123,7 +125,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 
@@ -151,5 +153,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
     return setBuilder.build();
   }
 
-
+  public Configuration getFsConf() {
+    return fsConf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 44132d0..e11712e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.fs.Path;
  * This is the top level schema that responds to root level path requests. Also supports
  */
 public class FileSystemSchemaFactory implements SchemaFactory{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+
+  public static final String DEFAULT_WS_NAME = "default";
 
   private List<WorkspaceSchemaFactory> factories;
   private String schemaName;
-  private final String defaultSchemaName = "default";
-
 
   public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
     super();
@@ -58,7 +58,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     FileSystemSchema schema = new FileSystemSchema(schemaName, session);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
@@ -69,14 +69,14 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     private final WorkspaceSchema defaultSchema;
     private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
 
-    public FileSystemSchema(String name, UserSession session) {
+    public FileSystemSchema(String name, UserSession session) throws IOException {
       super(ImmutableList.<String>of(), name);
       for(WorkspaceSchemaFactory f :  factories){
         WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
         schemaMap.put(s.getName(), s);
       }
 
-      defaultSchema = schemaMap.get(defaultSchemaName);
+      defaultSchema = schemaMap.get(DEFAULT_WS_NAME);
     }
 
     void setPlus(SchemaPlus plusOfThis){

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index c164ed5..d2a903b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -31,20 +31,23 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.DrillbitContext;
 
 import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 
 public class FormatCreator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
 
-  static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
-  static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
+  private static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+      Configuration.class, StoragePluginConfig.class, FormatPluginConfig.class);
+  private static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+      Configuration.class, StoragePluginConfig.class);
 
-  static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) {
+  static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, Configuration fsConf,
+      FileSystemConfig storageConfig) {
     final DrillConfig config = context.getConfig();
     Map<String, FormatPlugin> plugins = Maps.newHashMap();
 
     Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
 
-
     if (storageConfig.formats == null || storageConfig.formats.isEmpty()) {
 
       for (Class<? extends FormatPlugin> pluginClass: pluginClasses) {
@@ -53,7 +56,7 @@ public class FormatCreator {
             if (!DEFAULT_BASED.check(c)) {
               continue;
             }
-            FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig);
+            FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fsConf, storageConfig);
             plugins.put(plugin.getName(), plugin);
           } catch (Exception e) {
             logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
@@ -84,7 +87,7 @@ public class FormatCreator {
           continue;
         }
         try {
-          plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue()));
+          plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fsConf, storageConfig, e.getValue()));
         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
           logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
index 92e3d0a..0b8c7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
@@ -23,6 +23,6 @@ public abstract class FormatMatcher {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);
 
   public abstract boolean supportDirectoryReads();
-  public abstract FormatSelection isReadable(FileSelection selection) throws IOException;
+  public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException;
   public abstract FormatPlugin getFormatPlugin();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 58d5b42..955dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Similar to a storage engine but built specifically to work within a FileSystem context.
@@ -51,7 +52,7 @@ public interface FormatPlugin {
 
   public FormatPluginConfig getConfig();
   public StoragePluginConfig getStorageConfig();
-  public DrillFileSystem getFileSystem();
+  public Configuration getFsConf();
   public DrillbitContext getContext();
   public String getName();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 45e9129..a536350 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -52,14 +53,14 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+public class WorkspaceSchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
 
   private final List<FormatMatcher> fileMatchers;
   private final List<FormatMatcher> dirMatchers;
 
   private final WorkspaceConfig config;
-  private final DrillFileSystem fs;
+  private final Configuration fsConf;
   private final DrillConfig drillConfig;
   private final String storageEngineName;
   private final String schemaName;
@@ -67,9 +68,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
   private final ObjectMapper mapper;
 
   public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, String schemaName,
-      String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config,
-      List<FormatMatcher> formatMatchers) throws ExecutionSetupException, IOException {
-    this.fs = fileSystem;
+      String storageEngineName, WorkspaceConfig config, List<FormatMatcher> formatMatchers)
+    throws ExecutionSetupException, IOException {
+    this.fsConf = plugin.getFsConf();
     this.plugin = plugin;
     this.drillConfig = drillConfig;
     this.config = config;
@@ -95,7 +96,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
             defaultInputFormat, storageEngineName, schemaName);
         throw new ExecutionSetupException(message);
       }
-      final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs,
+      final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
           ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
       fileMatchers.add(fallbackMatcher);
     }
@@ -105,54 +106,21 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     return DotDrillType.VIEW.getPath(config.getLocation(), name);
   }
 
-  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) {
+  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
     return new WorkspaceSchema(parentSchemaPath, schemaName, session);
   }
 
-  @Override
-  public DrillTable create(String key) {
-    try {
+  public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+    private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
+    private final UserSession session;
+    private final DrillFileSystem fs;
 
-      FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
-      if (fileSelection == null) {
-        return null;
-      }
-
-      if (fileSelection.containsDirectories(fs)) {
-        for (FormatMatcher m : dirMatchers) {
-          try {
-            Object selection = m.isReadable(fileSelection);
-            if (selection != null) {
-              return new DynamicDrillTable(plugin, storageEngineName, selection);
-            }
-          } catch (IOException e) {
-            logger.debug("File read failed.", e);
-          }
-        }
-        fileSelection = fileSelection.minusDirectories(fs);
-      }
-
-      for (FormatMatcher m : fileMatchers) {
-        Object selection = m.isReadable(fileSelection);
-        if (selection != null) {
-          return new DynamicDrillTable(plugin, storageEngineName, selection);
-        }
-      }
-      return null;
-
-    } catch (IOException e) {
-      logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+      super(parentSchemaPath, wsName);
+      this.session = session;
+      this.fs = new DrillFileSystem(fsConf);
     }
 
-    return null;
-  }
-
-  @Override
-  public void destroy(DrillTable value) {
-  }
-
-  public class WorkspaceSchema extends AbstractSchema {
-
     public boolean createView(View view) throws Exception {
       Path viewPath = getViewPath(view.getName());
       boolean replaced = fs.exists(viewPath);
@@ -186,15 +154,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
       fs.delete(getViewPath(viewName), false);
     }
 
-    private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(WorkspaceSchemaFactory.this);
-
-    private UserSession session;
-
-    public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
-      super(parentSchemaPath, name);
-      this.session = session;
-    }
-
     private Set<String> getViews() {
       Set<String> viewSet = Sets.newHashSet();
       // Look for files with ".view.drill" extension.
@@ -282,5 +241,47 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     public String getTypeName() {
       return FileSystemConfig.NAME;
     }
+
+    @Override
+    public DrillTable create(String key) {
+      try {
+
+        FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
+        if (fileSelection == null) {
+          return null;
+        }
+
+        if (fileSelection.containsDirectories(fs)) {
+          for (FormatMatcher m : dirMatchers) {
+            try {
+              Object selection = m.isReadable(fs, fileSelection);
+              if (selection != null) {
+                return new DynamicDrillTable(plugin, storageEngineName, selection);
+              }
+            } catch (IOException e) {
+              logger.debug("File read failed.", e);
+            }
+          }
+          fileSelection = fileSelection.minusDirectories(fs);
+        }
+
+        for (FormatMatcher m : fileMatchers) {
+          Object selection = m.isReadable(fs, fileSelection);
+          if (selection != null) {
+            return new DynamicDrillTable(plugin, storageEngineName, selection);
+          }
+        }
+        return null;
+
+      } catch (IOException e) {
+        logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+      }
+
+      return null;
+    }
+
+    @Override
+    public void destroy(DrillTable value) {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 6e1e0cc..5c7152a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -54,38 +54,39 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
 
   private final BasicFormatMatcher matcher;
   private final DrillbitContext context;
   private final boolean readable;
   private final boolean writable;
   private final boolean blockSplittable;
-  private final DrillFileSystem fs;
+  private final Configuration fsConf;
   private final StoragePluginConfig storageConfig;
   protected final FormatPluginConfig formatConfig;
   private final String name;
   protected final CompressionCodecFactory codecFactory;
   private final boolean compressible;
 
-  protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
-                             T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
-    this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
+  protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
+      boolean compressible, List<String> extensions, String defaultName){
+    this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
     this.readable = readable;
     this.writable = writable;
     this.context = context;
     this.blockSplittable = blockSplittable;
     this.compressible = compressible;
-    this.fs = fs;
+    this.fsConf = fsConf;
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
     this.name = name == null ? defaultName : name;
-    this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
+    this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
   }
 
   @Override
-  public DrillFileSystem getFileSystem() {
-    return fs;
+  public Configuration getFsConf() {
+    return fsConf;
   }
 
   @Override
@@ -152,7 +153,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     int numParts = 0;
     OperatorContext oContext = new OperatorContext(scan, context,
         false /* ScanBatch is not subject to fragment memory limit */);
-    DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
+    DrillFileSystem dfs;
+    try {
+      dfs = new DrillFileSystem(fsConf, oContext.getStats());
+    } catch (IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+    }
+
     for(FileWork work : scan.getWorkUnits()){
       readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
       if (scan.getSelectionRoot() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 54cad56..7c70df3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
@@ -51,7 +52,7 @@ import com.google.common.collect.Lists;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
 
   private FileSelection selection;
   private final EasyFormatPlugin<?> formatPlugin;
@@ -109,9 +110,10 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
+    final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
     this.selection = selection;
-    BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
-    this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+    BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
+    this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
     this.maxWidth = chunks.size();
     this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 2e65466..32e34b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -38,22 +39,26 @@ import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
   private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "json";
+  private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
 
-  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
-    this(name, context, fs, storageConfig, new JSONFormatConfig());
+  public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+    this(name, context, fsConf, storageConfig, new JSONFormatConfig());
   }
 
-  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json");
+  public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+      JSONFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE,
+        DEFAULT_EXTS, DEFAULT_NAME);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bf46395..237589c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.store.easy.text;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +42,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -51,13 +52,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
+  private final static String DEFAULT_NAME = "text";
 
-  public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
-    super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList<String>(), "text");
+  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+    super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
+        Collections.<String>emptyList(), DEFAULT_NAME);
   }
 
-  public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text");
+  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+      TextFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
+        formatPluginConfig.getExtensions(), DEFAULT_NAME);
   }
 
 
@@ -67,7 +72,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
     Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
-    return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+    return new DrillTextRecordReader(split, getFsConf(), context,
+        ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 77c6b9a..a1249e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -69,7 +69,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     ISchema s = new ISchema(parent, this);
     parent.add(s.getName(), s);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 51b2208..96226a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -55,7 +55,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e204a2c..9c83ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -62,37 +62,44 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class ParquetFormatPlugin implements FormatPlugin{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
-  private final DrillbitContext context;
   public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-  private CodecFactoryExposer codecFactoryExposer;
-  private final DrillFileSystem fs;
+
+  private static final String DEFAULT_NAME = "parquet";
+
+  private static final List<Pattern> PATTERNS = Lists.newArrayList(
+      Pattern.compile(".*\\.parquet$"),
+      Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
+  private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+
+  private final DrillbitContext context;
+  private final CodecFactoryExposer codecFactoryExposer;
+  private final Configuration fsConf;
   private final ParquetFormatMatcher formatMatcher;
   private final ParquetFormatConfig config;
   private final StoragePluginConfig storageConfig;
   private final String name;
 
-  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
-    this(name, context, fs, storageConfig, new ParquetFormatConfig());
+  public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig){
+    this(name, context, fsConf, storageConfig, new ParquetFormatConfig());
   }
 
-  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+  public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
-    this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
+    this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
     this.config = formatConfig;
-    this.formatMatcher = new ParquetFormatMatcher(this, fs);
+    this.formatMatcher = new ParquetFormatMatcher(this);
     this.storageConfig = storageConfig;
-    this.fs = fs;
-    this.name = name == null ? "parquet" : name;
-  }
-
-  Configuration getHadoopConfig() {
-    return fs.getConf();
+    this.fsConf = fsConf;
+    this.name = name == null ? DEFAULT_NAME : name;
   }
 
-  public DrillFileSystem getFileSystem() {
-    return fs;
+  @Override
+  public Configuration getFsConf() {
+    return fsConf;
   }
 
   @Override
@@ -155,12 +162,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   @Override
   public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+    return getGroupScan(selection, null);
   }
 
   @Override
   public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+    final DrillFileSystem dfs = new DrillFileSystem(fsConf);
+    return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
   }
 
   @Override
@@ -190,20 +198,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   private static class ParquetFormatMatcher extends BasicFormatMatcher{
 
-    private final DrillFileSystem fs;
-
-    public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
-      super(plugin, fs, //
-          Lists.newArrayList( //
-              Pattern.compile(".*\\.parquet$"), //
-              Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
-              //
-              ),
-          Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
-
-          );
-      this.fs = fs;
-
+    public ParquetFormatMatcher(ParquetFormatPlugin plugin) {
+      super(plugin, PATTERNS, MAGIC_STRINGS);
     }
 
     @Override
@@ -212,17 +208,17 @@ public class ParquetFormatPlugin implements FormatPlugin{
     }
 
     @Override
-    public FormatSelection isReadable(FileSelection selection) throws IOException {
+    public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
       // TODO: we only check the first file for directory reading.  This is because
       if(selection.containsDirectories(fs)){
-        if(isDirReadable(selection.getFirstPath(fs))){
+        if(isDirReadable(fs, selection.getFirstPath(fs))){
           return new FormatSelection(plugin.getConfig(), selection);
         }
       }
-      return super.isReadable(selection);
+      return super.isReadable(fs, selection);
     }
 
-    boolean isDirReadable(FileStatus dir) {
+    boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
       Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
       try {
         if (fs.exists(p)) {
@@ -235,16 +231,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
           if (files.length == 0) {
             return false;
           }
-          return super.isReadable(files[0]);
+          return super.isReadable(fs, files[0]);
         }
       } catch (IOException e) {
         logger.info("Failure while attempting to check for Parquet metadata file.", e);
         return false;
       }
     }
-
-
-
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index acac61f..a59f2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
@@ -76,11 +77,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
-  static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
-  static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
-  static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
-
-  final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
 
   private ListMultimap<Integer, RowGroupInfo> mappings;
   private List<RowGroupInfo> rowGroupInfos;
@@ -135,7 +131,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin);
-    this.fs = formatPlugin.getFileSystem();
+    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
@@ -155,7 +151,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatPlugin = formatPlugin;
     this.columns = columns;
     this.formatConfig = formatPlugin.getConfig();
-    this.fs = formatPlugin.getFileSystem();
+    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
 
     this.entries = Lists.newArrayList();
     for (FileStatus file : files) {
@@ -205,7 +201,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     ColumnChunkMetaData columnChunkMetaData;
 
-    List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getHadoopConfig(), statuses, 16);
+    List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
     for (Footer footer : footers) {
       int index = 0;
       ParquetMetadata metadata = footer.getParquetMetadata();
@@ -260,11 +256,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
-  @JsonIgnore
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
   @Override
   public void modifyFileSelection(FileSelection selection) {
     entries.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index c1f815e..b1c725c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.column.ColumnDescriptor;
@@ -95,7 +94,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       rowGroupScan.setOperatorId(id);
     }
 
-    DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
+    DrillFileSystem fs;
+    try {
+      fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+    } catch(IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+    }
     Configuration conf = fs.getConf();
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index b92f98c..4a3b97b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -67,7 +67,7 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     parent.add(schema.getName(), schema);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 1ad053d..3368412 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.RepeatedVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
@@ -69,7 +70,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   private FileSplit split;
   private long totalRecordsRead;
 
-  public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
+  public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
+      List<SchemaPath> columns) {
     this.fragmentContext = context;
     this.delimiter = (byte) delimiter;
     this.split = split;
@@ -95,7 +97,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
     targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
 
     TextInputFormat inputFormat = new TextInputFormat();
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(fsConf);
     job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
     job.setInputFormat(inputFormat.getClass());
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3fb0775..b6c6852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -124,7 +124,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
     try {
       // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
       if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
             drillbitContext.getFunctionImplementationRegistry());
         final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
         final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94ffba..cb2753c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -348,7 +348,7 @@ public class Foreman implements Runnable {
     injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
 
     // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
+    setupRootFragment(rootPlanFragment, work.getRootOperator());
 
     setupNonRootFragments(planFragments);
     drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
@@ -791,15 +791,14 @@ public class Foreman implements Runnable {
    * Set up the root fragment (which will run locally), and submit it for execution.
    *
    * @param rootFragment
-   * @param rootClient
    * @param rootOperator
    * @throws ExecutionSetupException
    */
-  private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
-      final FragmentRoot rootOperator) throws ExecutionSetupException {
+  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
+      throws ExecutionSetupException {
     @SuppressWarnings("resource")
-    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
-        drillbitContext.getFunctionImplementationRegistry());
+    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
+        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
     @SuppressWarnings("resource")
     final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
     rootContext.setBuffers(buffers);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index a5b928b..f526fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -56,7 +56,7 @@ public class NonRootFragmentManager implements FragmentManager {
     try {
       this.fragment = fragment;
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
+      this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
       final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
           fragment.getForeman()));

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index ab6639e..550f56f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -74,7 +74,7 @@ public class TestDrillFileSystem {
     stats.startProcessing();
 
     try {
-      dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+      dfs = new DrillFileSystem(conf, stats);
       is = dfs.open(new Path(tempFilePath));
 
       byte[] buf = new byte[8000];