You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/06 02:18:27 UTC

[3/3] drill git commit: DRILL-2080: Add IO Stats for JSON/Parquet type input files

DRILL-2080: Add IO Stats for JSON/Parquet type input files

Refactoring: Remove FileSystem shim loader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c54bd6ac
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c54bd6ac
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c54bd6ac

Branch: refs/heads/master
Commit: c54bd6acb81daa9e1c3ce358e9d3a4ce909f02aa
Parents: 8e8b181
Author: vkorukanti <ve...@gmail.com>
Authored: Mon Dec 15 08:59:25 2014 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Thu Feb 5 15:12:37 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/dotdrill/DotDrillFile.java       |   9 +-
 .../drill/exec/dotdrill/DotDrillUtil.java       |   6 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  49 +-
 .../planner/sql/handlers/ShowFileHandler.java   |   5 +-
 .../exec/store/dfs/BasicFormatMatcher.java      |   5 +-
 .../exec/store/dfs/DrillFSDataInputStream.java  | 176 +++++
 .../drill/exec/store/dfs/DrillFileSystem.java   | 682 +++++++++++++++++++
 .../drill/exec/store/dfs/FileSelection.java     |   4 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |   4 +-
 .../drill/exec/store/dfs/FormatCreator.java     |   1 -
 .../drill/exec/store/dfs/FormatPlugin.java      |   1 -
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  14 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  15 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      |   2 +-
 .../exec/store/dfs/shim/DrillFileSystem.java    |  44 --
 .../exec/store/dfs/shim/DrillInputStream.java   |  32 -
 .../exec/store/dfs/shim/DrillOutputStream.java  |  29 -
 .../exec/store/dfs/shim/FileSystemCreator.java  |  35 -
 .../dfs/shim/fallback/FallbackFileSystem.java   | 159 -----
 .../exec/store/easy/json/JSONFormatPlugin.java  |   6 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   5 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |   6 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  10 +-
 .../exec/store/parquet/ParquetGroupScan.java    |   4 +-
 .../store/parquet/ParquetScanBatchCreator.java  |  11 +-
 .../exec/store/parquet2/DrillParquetReader.java |  13 +-
 .../drill/exec/store/sys/local/FilePStore.java  |  20 +-
 .../store/sys/local/LocalPStoreProvider.java    |   3 +-
 .../drill/exec/store/sys/zk/ZkPStore.java       |  10 -
 .../exec/store/sys/zk/ZkPStoreProvider.java     |   2 +-
 .../exec/store/dfs/TestDrillFileSystem.java     |  91 +++
 31 files changed, 1048 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
index 009cd00..6a5934b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
@@ -18,12 +18,13 @@
 package org.apache.drill.exec.dotdrill;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 
 import com.google.common.base.Preconditions;
 
+import java.io.InputStream;
+
 public class DotDrillFile {
 
   private FileStatus status;
@@ -51,8 +52,8 @@ public class DotDrillFile {
 
   public View getView(DrillConfig config) throws Exception{
     Preconditions.checkArgument(type == DotDrillType.VIEW);
-    try(DrillInputStream is = fs.open(status.getPath())){
-      return config.getMapper().readValue(is.getInputStream(), View.class);
+    try(InputStream is = fs.open(status.getPath())){
+      return config.getMapper().readValue(is, View.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
index 63b22e9..cef4359 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.dotdrill;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -50,7 +50,7 @@ public class DotDrillUtil {
   }
 
   public static List<DotDrillFile> getDotDrills(DrillFileSystem fs, Path root, DotDrillType... types) throws IOException{
-    return getDrillFiles(fs, fs.getUnderlying().globStatus(new Path(root, "*.drill")), types);
+    return getDrillFiles(fs, fs.globStatus(new Path(root, "*.drill")), types);
   }
 
     public static List<DotDrillFile> getDotDrills(DrillFileSystem fs, Path root, String name, DotDrillType... types) throws IOException{
@@ -58,6 +58,6 @@ public class DotDrillUtil {
         name = name + DotDrillType.DOT_DRILL_GLOB;
       }
 
-      return getDrillFiles(fs, fs.getUnderlying().globStatus(new Path(root, name)), types);
+      return getDrillFiles(fs, fs.globStatus(new Path(root, name)), types);
     }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d68a5b5..6b7294d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,17 +83,23 @@ public class ScanBatch implements RecordBatch {
   private boolean done = false;
   private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
-  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext,
+                   Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
     this.readers = readers;
     if (!readers.hasNext()) {
       throw new ExecutionSetupException("A scan batch must contain at least one reader.");
     }
     this.currentReader = readers.next();
-    // Scan Batch is not subject to fragment memory limit
-    this.oContext = new OperatorContext(subScanConfig, context, false);
+    this.oContext = oContext;
     this.currentReader.setOperatorContext(this.oContext);
-    this.currentReader.setup(mutator);
+
+    try {
+      oContext.getStats().startProcessing();
+      this.currentReader.setup(mutator);
+    } finally {
+      oContext.getStats().stopProcessing();
+    }
     this.partitionColumns = partitionColumns.iterator();
     this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
     this.selectedPartitionColumns = selectedPartitionColumns;
@@ -103,7 +109,9 @@ public class ScanBatch implements RecordBatch {
   }
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
-    this(subScanConfig, context, readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
+    this(subScanConfig, context,
+        new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */),
+        readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
   }
 
   public FragmentContext getContext() {
@@ -166,26 +174,23 @@ public class ScanBatch implements RecordBatch {
             }
             return IterOutcome.NONE;
           }
-          oContext.getStats().startSetup();
+
+          currentReader.cleanup();
+          currentReader = readers.next();
+          partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+          currentReader.setup(mutator);
+          currentReader.setOperatorContext(oContext);
           try {
-            currentReader.cleanup();
-            currentReader = readers.next();
-            partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
-            currentReader.setup(mutator);
-            currentReader.setOperatorContext(oContext);
-            try {
-              currentReader.allocate(fieldVectorMap);
-            } catch (OutOfMemoryException e) {
-              logger.debug("Caught OutOfMemoryException");
-              for (ValueVector v : fieldVectorMap.values()) {
-                v.clear();
-              }
-              return IterOutcome.OUT_OF_MEMORY;
+            currentReader.allocate(fieldVectorMap);
+          } catch (OutOfMemoryException e) {
+            logger.debug("Caught OutOfMemoryException");
+            for (ValueVector v : fieldVectorMap.values()) {
+              v.clear();
             }
-            addPartitionVectors();
-          } finally {
-            oContext.getStats().stopSetup();
+            return IterOutcome.OUT_OF_MEMORY;
           }
+          addPartitionVectors();
+
         } catch (ExecutionSetupException e) {
           this.context.fail(e);
           releaseAssets();

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index ff3542d..2504ed9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -22,20 +22,17 @@ import java.util.ArrayList;
 import java.util.List;
 
 import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.tools.Planner;
 import net.hydromatic.optiq.tools.RelConversionException;
 import net.hydromatic.optiq.tools.ValidationException;
 
-import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.eigenbase.relopt.hep.HepPlanner;
 import org.eigenbase.sql.SqlIdentifier;
 import org.eigenbase.sql.SqlNode;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 2ba2910..9756f3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -64,7 +63,7 @@ public class BasicFormatMatcher extends FormatMatcher{
     this.fs = fs;
     this.plugin = plugin;
     this.compressible = compressible;
-    this.codecFactory = new CompressionCodecFactory(fs.getUnderlying().getConf());
+    this.codecFactory = new CompressionCodecFactory(fs.getConf());
   }
 
   @Override
@@ -135,7 +134,7 @@ public class BasicFormatMatcher extends FormatMatcher{
       }
       final Range<Long> fileRange = Range.closedOpen( 0L, status.getLen());
 
-      try (FSDataInputStream is = fs.open(status.getPath()).getInputStream()) {
+      try (FSDataInputStream is = fs.open(status.getPath())) {
         for(RangeMagics rMagic : ranges) {
           Range<Long> r = rMagic.range;
           if (!fileRange.encloses(r)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
new file mode 100644
index 0000000..44ef8a3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+
+/**
+ * Wrapper around FSDataInputStream to collect IO Stats.
+ */
+public class DrillFSDataInputStream extends FSDataInputStream {
+  private FSDataInputStream underlyingIs;
+
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
+    super(new WrappedInputStream(in, operatorStats));
+    this.underlyingIs = in;
+  }
+
+  @Override
+  public synchronized void seek(long desired) throws IOException {
+    underlyingIs.seek(desired);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return underlyingIs.getPos();
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    return underlyingIs.read(position, buffer, offset, length);
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    underlyingIs.readFully(position, buffer, offset, length);
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    underlyingIs.readFully(position, buffer);
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return underlyingIs.seekToNewSource(targetPos);
+  }
+
+  @Override
+  public InputStream getWrappedStream() {
+    return underlyingIs.getWrappedStream();
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    return underlyingIs.read(buf);
+  }
+
+  @Override
+  public FileDescriptor getFileDescriptor() throws IOException {
+    return underlyingIs.getFileDescriptor();
+  }
+
+  @Override
+  public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+    underlyingIs.setReadahead(readahead);
+  }
+
+  @Override
+  public void setDropBehind(Boolean dropBehind) throws IOException, UnsupportedOperationException {
+    underlyingIs.setDropBehind(dropBehind);
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
+    return underlyingIs.read(bufferPool, maxLength, opts);
+  }
+
+  @Override
+  public void releaseBuffer(ByteBuffer buffer) {
+    underlyingIs.releaseBuffer(buffer);
+  }
+
+  /**
+   * We need to wrap the FSDataInputStream inside a InputStream, because read() method in InputStream is
+   * overridden in FilterInputStream (super class of FSDataInputStream) as final, so we can not override in
+   * DrillFSDataInputStream.
+   */
+  private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable {
+    final FSDataInputStream is;
+    final OperatorStats operatorStats;
+
+    WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) {
+      this.is = is;
+      this.operatorStats = operatorStats;
+    }
+
+    /**
+     * Most of the read are going to be block reads which use {@link #read(byte[], int,
+     * int)}. So not adding stats for single byte reads.
+     */
+    @Override
+    public int read() throws IOException {
+      return is.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      operatorStats.startWait();
+      int numBytesRead;
+      try {
+        numBytesRead = is.read(b, off, len);
+      } finally {
+        operatorStats.stopWait();
+      }
+
+      return numBytesRead;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+      return is.read(position, buffer, offset, length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+      is.readFully(position, buffer, offset, length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      is.readFully(position, buffer);
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      is.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return is.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return is.seekToNewSource(targetPos);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
new file mode 100644
index 0000000..f5730a1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -0,0 +1,682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * DrillFileSystem is the wrapper around the actual FileSystem implementation.
+ *
+ * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns a instrumented FSDataInputStream to
+ * measure IO wait time.
+ */
+public class DrillFileSystem extends FileSystem {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
+
+  private final FileSystem underlyingFs;
+  private final OperatorStats operatorStats;
+
+  public DrillFileSystem(Configuration fsConf) throws IOException {
+    this.underlyingFs = FileSystem.get(fsConf);
+    this.operatorStats = null;
+  }
+
+  public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
+    this.underlyingFs = fs;
+    this.operatorStats = operatorStats;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    // Guard against setConf(null) call that is called as part of superclass constructor (Configured) of the
+    // DrillFileSystem, at which point underlyingFs is null.
+    if (conf != null && underlyingFs != null) {
+      underlyingFs.setConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return underlyingFs.getConf();
+  }
+
+  /**
+   * If OperatorStats are provided return a instrumented {@link org.apache.hadoop.fs.FSDataInputStream}.
+   */
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    if (operatorStats != null) {
+      return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
+    }
+
+    return underlyingFs.open(f, bufferSize);
+  }
+
+  /**
+   * If OperatorStats are provided return a instrumented {@link org.apache.hadoop.fs.FSDataInputStream}.
+   */
+  @Override
+  public FSDataInputStream open(Path f) throws IOException {
+    if (operatorStats != null) {
+      return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
+    }
+
+    return underlyingFs.open(f);
+  }
+
+  @Override
+  public void initialize(URI name, Configuration conf) throws IOException {
+    underlyingFs.initialize(name, conf);
+  }
+
+  @Override
+  public String getScheme() {
+    return underlyingFs.getScheme();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f) throws IOException {
+    return underlyingFs.create(f);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+    return underlyingFs.create(f, overwrite);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+    return underlyingFs.create(f, progress);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, short replication) throws IOException {
+    return underlyingFs.create(f, replication);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
+    return underlyingFs.create(f, replication, progress);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+    return underlyingFs.create(f, overwrite, bufferSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) throws IOException {
+    return underlyingFs.create(f, overwrite, bufferSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
+      long blockSize) throws IOException {
+    return underlyingFs.create(f, overwrite, bufferSize, replication, blockSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.create(f, overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return underlyingFs.getFileStatus(f);
+  }
+
+  @Override
+  public void createSymlink(Path target, Path link, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, IOException {
+    underlyingFs.createSymlink(target, link, createParent);
+  }
+
+  @Override
+  public FileStatus getFileLinkStatus(Path f) throws AccessControlException, FileNotFoundException,
+      UnsupportedFileSystemException, IOException {
+    return underlyingFs.getFileLinkStatus(f);
+  }
+
+  @Override
+  public boolean supportsSymlinks() {
+    return underlyingFs.supportsSymlinks();
+  }
+
+  @Override
+  public Path getLinkTarget(Path f) throws IOException {
+    return underlyingFs.getLinkTarget(f);
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(Path f) throws IOException {
+    return underlyingFs.getFileChecksum(f);
+  }
+
+  @Override
+  public void setVerifyChecksum(boolean verifyChecksum) {
+    underlyingFs.setVerifyChecksum(verifyChecksum);
+  }
+
+  @Override
+  public void setWriteChecksum(boolean writeChecksum) {
+    underlyingFs.setWriteChecksum(writeChecksum);
+  }
+
+  @Override
+  public FsStatus getStatus() throws IOException {
+    return underlyingFs.getStatus();
+  }
+
+  @Override
+  public FsStatus getStatus(Path p) throws IOException {
+    return underlyingFs.getStatus(p);
+  }
+
+  @Override
+  public void setPermission(Path p, FsPermission permission) throws IOException {
+    underlyingFs.setPermission(p, permission);
+  }
+
+  @Override
+  public void setOwner(Path p, String username, String groupname) throws IOException {
+    underlyingFs.setOwner(p, username, groupname);
+  }
+
+  @Override
+  public void setTimes(Path p, long mtime, long atime) throws IOException {
+    underlyingFs.setTimes(p, mtime, atime);
+  }
+
+  @Override
+  public Path createSnapshot(Path path, String snapshotName) throws IOException {
+    return underlyingFs.createSnapshot(path, snapshotName);
+  }
+
+  @Override
+  public void renameSnapshot(Path path, String snapshotOldName, String snapshotNewName) throws IOException {
+    underlyingFs.renameSnapshot(path, snapshotOldName, snapshotNewName);
+  }
+
+  @Override
+  public void deleteSnapshot(Path path, String snapshotName) throws IOException {
+    underlyingFs.deleteSnapshot(path, snapshotName);
+  }
+
+  @Override
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
+    underlyingFs.modifyAclEntries(path, aclSpec);
+  }
+
+  @Override
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
+    underlyingFs.removeAclEntries(path, aclSpec);
+  }
+
+  @Override
+  public void removeDefaultAcl(Path path) throws IOException {
+    underlyingFs.removeDefaultAcl(path);
+  }
+
+  @Override
+  public void removeAcl(Path path) throws IOException {
+    underlyingFs.removeAcl(path);
+  }
+
+  @Override
+  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+    underlyingFs.setAcl(path, aclSpec);
+  }
+
+  @Override
+  public AclStatus getAclStatus(Path path) throws IOException {
+    return underlyingFs.getAclStatus(path);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return underlyingFs.getWorkingDirectory();
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    return underlyingFs.append(f, bufferSize, progress);
+  }
+
+  @Override
+  public void concat(Path trg, Path[] psrcs) throws IOException {
+    underlyingFs.concat(trg, psrcs);
+  }
+
+  @Override
+  @Deprecated
+  public short getReplication(Path src) throws IOException {
+    return underlyingFs.getReplication(src);
+  }
+
+  @Override
+  public boolean setReplication(Path src, short replication) throws IOException {
+    return underlyingFs.setReplication(src, replication);
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return underlyingFs.mkdirs(f, permission);
+  }
+
+  @Override
+  public void copyFromLocalFile(Path src, Path dst) throws IOException {
+    underlyingFs.copyFromLocalFile(src, dst);
+  }
+
+  @Override
+  public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
+    underlyingFs.moveFromLocalFile(srcs, dst);
+  }
+
+  @Override
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {
+    underlyingFs.moveFromLocalFile(src, dst);
+  }
+
+  @Override
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+    underlyingFs.copyFromLocalFile(delSrc, src, dst);
+  }
+
+  @Override
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException {
+    underlyingFs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
+  }
+
+  @Override
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
+    underlyingFs.copyFromLocalFile(delSrc, overwrite, src, dst);
+  }
+
+  @Override
+  public void copyToLocalFile(Path src, Path dst) throws IOException {
+    underlyingFs.copyToLocalFile(src, dst);
+  }
+
+  @Override
+  public void moveToLocalFile(Path src, Path dst) throws IOException {
+    underlyingFs.moveToLocalFile(src, dst);
+  }
+
+  @Override
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+    underlyingFs.copyToLocalFile(delSrc, src, dst);
+  }
+
+  @Override
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem) throws IOException {
+    underlyingFs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
+  }
+
+  @Override
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+    return underlyingFs.startLocalOutput(fsOutputFile, tmpLocalFile);
+  }
+
+  @Override
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
+    underlyingFs.completeLocalOutput(fsOutputFile, tmpLocalFile);
+  }
+
+  @Override
+  public void close() throws IOException {
+    underlyingFs.close();
+  }
+
+  @Override
+  public long getUsed() throws IOException {
+    return underlyingFs.getUsed();
+  }
+
+  @Override
+  @Deprecated
+  public long getBlockSize(Path f) throws IOException {
+    return underlyingFs.getBlockSize(f);
+  }
+
+  @Override
+  @Deprecated
+  public long getDefaultBlockSize() {
+    return underlyingFs.getDefaultBlockSize();
+  }
+
+  @Override
+  public long getDefaultBlockSize(Path f) {
+    return underlyingFs.getDefaultBlockSize(f);
+  }
+
+  @Override
+  @Deprecated
+  public short getDefaultReplication() {
+    return underlyingFs.getDefaultReplication();
+  }
+
+  @Override
+  public short getDefaultReplication(Path path) {
+    return underlyingFs.getDefaultReplication(path);
+  }
+
+  @Override
+  public boolean mkdirs(Path folderPath) throws IOException {
+    if (!underlyingFs.exists(folderPath)) {
+      return underlyingFs.mkdirs(folderPath);
+    } else if (!underlyingFs.getFileStatus(folderPath).isDir()) {
+      throw new IOException("The specified folder path exists and is not a folder.");
+    }
+    return false;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+      short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+    return underlyingFs.create(f, permission, flags, bufferSize, replication, blockSize, progress, checksumOpt);
+  }
+
+  @Override
+  @Deprecated
+  public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  @Deprecated
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+      short replication, long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.createNonRecursive(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  @Deprecated
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.createNonRecursive(f, permission, flags, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public boolean createNewFile(Path f) throws IOException {
+    return underlyingFs.createNewFile(f);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f) throws IOException {
+    return underlyingFs.append(f);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+    return underlyingFs.append(f, bufferSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short
+      replication, long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+      short replication, long blockSize, Progressable progress) throws IOException {
+    return underlyingFs.create(f, permission, flags, bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+    return underlyingFs.listStatus(f);
+  }
+
+  @Override
+  public RemoteIterator<Path> listCorruptFileBlocks(Path path) throws IOException {
+    return underlyingFs.listCorruptFileBlocks(path);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException {
+    return underlyingFs.listStatus(f, filter);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException {
+    return underlyingFs.listStatus(files);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException {
+    return underlyingFs.listStatus(files, filter);
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    return underlyingFs.globStatus(pathPattern);
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
+    return underlyingFs.globStatus(pathPattern, filter);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws FileNotFoundException, IOException {
+    return underlyingFs.listLocatedStatus(f);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException {
+    return underlyingFs.listFiles(f, recursive);
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    return underlyingFs.getHomeDirectory();
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    underlyingFs.setWorkingDirectory(new_dir);
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    return underlyingFs.rename(src, dst);
+  }
+
+  @Override
+  @Deprecated
+  public boolean delete(Path f) throws IOException {
+    return underlyingFs.delete(f);
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return underlyingFs.delete(f, recursive);
+  }
+
+  @Override
+  public boolean deleteOnExit(Path f) throws IOException {
+    return underlyingFs.deleteOnExit(f);
+  }
+
+  @Override
+  public boolean cancelDeleteOnExit(Path f) {
+    return underlyingFs.cancelDeleteOnExit(f);
+  }
+
+  @Override
+  public boolean exists(Path f) throws IOException {
+    return underlyingFs.exists(f);
+  }
+
+  @Override
+  public boolean isDirectory(Path f) throws IOException {
+    return underlyingFs.isDirectory(f);
+  }
+
+  @Override
+  public boolean isFile(Path f) throws IOException {
+    return underlyingFs.isFile(f);
+  }
+
+  @Override
+  @Deprecated
+  public long getLength(Path f) throws IOException {
+    return underlyingFs.getLength(f);
+  }
+
+  @Override
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    return underlyingFs.getContentSummary(f);
+  }
+
+  @Override
+  public URI getUri() {
+    return underlyingFs.getUri();
+  }
+
+  @Override
+  @LimitedPrivate({"HDFS", "MapReduce"})
+  public String getCanonicalServiceName() {
+    return underlyingFs.getCanonicalServiceName();
+  }
+
+  @Override
+  @Deprecated
+  public String getName() {
+    return underlyingFs.getName();
+  }
+
+  @Override
+  public Path makeQualified(Path path) {
+    return underlyingFs.makeQualified(path);
+  }
+
+  @Override
+  @Private
+  public Token<?> getDelegationToken(String renewer) throws IOException {
+    return underlyingFs.getDelegationToken(renewer);
+  }
+
+  @Override
+  @LimitedPrivate({"HDFS", "MapReduce"})
+  public Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException {
+    return underlyingFs.addDelegationTokens(renewer, credentials);
+  }
+
+  @Override
+  @LimitedPrivate({"HDFS"})
+  @VisibleForTesting
+  public FileSystem[] getChildFileSystems() {
+    return underlyingFs.getChildFileSystems();
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+    return underlyingFs.getFileBlockLocations(file, start, len);
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException {
+    return underlyingFs.getFileBlockLocations(p, start, len);
+  }
+
+  @Override
+  @Deprecated
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return underlyingFs.getServerDefaults();
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults(Path p) throws IOException {
+    return underlyingFs.getServerDefaults(p);
+  }
+
+  @Override
+  public Path resolvePath(Path p) throws IOException {
+    return underlyingFs.resolvePath(p);
+  }
+
+  public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
+    if (recursive) {
+      List<FileStatus> statuses = Lists.newArrayList();
+      for (Path p : paths) {
+        addRecursiveStatus(underlyingFs.getFileStatus(p), statuses);
+      }
+      return statuses;
+
+    } else {
+      return Lists.newArrayList(underlyingFs.listStatus(paths));
+    }
+  }
+
+
+  private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
+    if (parent.isDir()) {
+      Path pattern = new Path(parent.getPath(), "*");
+      FileStatus[] sub = underlyingFs.globStatus(pattern, new DrillPathFilter());
+      for(FileStatus s : sub){
+        if (s.isDir()) {
+          addRecursiveStatus(s, listToFill);
+        } else {
+          listToFill.add(s);
+        }
+      }
+    } else {
+      listToFill.add(parent);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index cf8937f..b9ae303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -23,12 +23,10 @@ import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -136,7 +134,7 @@ public class FileSelection {
       return new FileSelection(Collections.singletonList(status), p.toUri().getPath());
     } else {
       Path p = new Path(parent,removeLeadingSlash(path));
-      FileStatus[] status = fs.getUnderlying().globStatus(p);
+      FileStatus[] status = fs.globStatus(p);
       if (status == null || status.length == 0) {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index db6c0c7..7a1f61e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -36,8 +36,6 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -71,7 +69,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
-      fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
+      fs = new DrillFileSystem(fsConf);
       formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
       List<FormatMatcher> matchers = Lists.newArrayList();
       formatPluginsByConfig = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index e5c0487..c164ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.util.ConstructorChecker;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 
 import com.google.common.collect.Maps;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 27f83f0..58d5b42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 
 /**
  * Similar to a storage engine but built specifically to work within a FileSystem context.

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 7b9d52c..7c8d9b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,7 +29,6 @@ import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.Table;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.dotdrill.DotDrillFile;
@@ -43,8 +43,6 @@ import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -172,9 +170,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
 
     public boolean createView(View view) throws Exception {
       Path viewPath = getViewPath(view.getName());
-      boolean replaced = fs.getUnderlying().exists(viewPath);
-      try (DrillOutputStream stream = fs.create(viewPath)) {
-        mapper.writeValue(stream.getOuputStream(), view);
+      boolean replaced = fs.exists(viewPath);
+      try (OutputStream stream = fs.create(viewPath)) {
+        mapper.writeValue(stream, view);
       }
       if (knownViews != null) {
         knownViews.put(view.getName(), viewPath.toString());
@@ -184,11 +182,11 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
 
     public boolean viewExists(String viewName) throws Exception {
       Path viewPath = getViewPath(viewName);
-      return fs.getUnderlying().exists(viewPath);
+      return fs.exists(viewPath);
     }
 
     public void dropView(String viewName) throws IOException {
-      fs.getUnderlying().delete(getViewPath(viewName), false);
+      fs.delete(getViewPath(viewName), false);
       if (knownViews != null) {
         knownViews.delete(viewName);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 9cc1808..431b362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -45,7 +46,7 @@ import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
@@ -79,7 +80,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
     this.name = name == null ? defaultName : name;
-    this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
+    this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
   }
 
   @Override
@@ -113,7 +114,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     return compressible;
   }
 
-  public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
+  public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+      List<SchemaPath> columns) throws ExecutionSetupException;
 
   RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
     String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
@@ -147,8 +149,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
 
     int numParts = 0;
+    OperatorContext oContext = new OperatorContext(scan, context,
+        false /* ScanBatch is not subject to fragment memory limit */);
+    DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
     for(FileWork work : scan.getWorkUnits()){
-      readers.add(getRecordReader(context, work, scan.getColumns()));
+      readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
       if (scan.getSelectionRoot() != null) {
         String[] r = scan.getSelectionRoot().split("/");
         String[] p = work.getPath().split("/");
@@ -170,7 +175,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
       }
     }
 
-    return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+    return new ScanBatch(scan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
   }
 
   public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index b505535..9902443 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -110,7 +110,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
     this.selection = selection;
-    BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+    BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
     this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
     this.maxWidth = chunks.size();
     this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
deleted file mode 100644
index d3f9134..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Wraps the underlying filesystem to provide advanced file system features. Delegates to underlying file system if
- * those features are exposed.
- */
-public abstract class DrillFileSystem implements AutoCloseable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
-
-  public abstract FileSystem getUnderlying();
-
-  public abstract BlockLocation[] getBlockLocations(FileStatus status, long start, long length) throws IOException;
-  public abstract List<FileStatus> list(boolean recursive, Path... paths) throws IOException;
-  public abstract FileStatus getFileStatus(Path p) throws IOException;
-  public abstract DrillOutputStream create(Path p) throws IOException;
-  public abstract DrillInputStream open(Path p) throws IOException;
-  public abstract boolean mkdirs(Path f) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
deleted file mode 100644
index 8c3b5ae..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-public abstract class DrillInputStream implements AutoCloseable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillInputStream.class);
-
-//  public abstract AccountingByteBuf readNow(long start, long length) throws IOException;
-//  public abstract void readNow(AccountingByteBuf b, long start, long length) throws IOException;
-//  public abstract AccountingByteBuf readNow() throws IOException;
-
-  public abstract FSDataInputStream getInputStream();
-//  public abstract CheckedFuture<Long, IOException> readFuture(AccountingByteBuf b, long start, long length) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
deleted file mode 100644
index 8e56232..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.OutputStream;
-
-
-public abstract class DrillOutputStream implements AutoCloseable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOutputStream.class);
-
-  public abstract OutputStream getOuputStream();
-//  public abstract CheckedFuture<Long, IOException> writeFuture(AccountingByteBuf b);
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
deleted file mode 100644
index a5ad257..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim;
-
-import java.io.IOException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.fallback.FallbackFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-public class FileSystemCreator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemCreator.class);
-
-  public static DrillFileSystem getFileSystem(DrillConfig config, Configuration fsConf) throws IOException{
-    FileSystem fs = FileSystem.get(fsConf);
-    return new FallbackFileSystem(config, fs);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
deleted file mode 100644
index 959529a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.shim.fallback;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
-import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
-public class FallbackFileSystem extends DrillFileSystem {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackFileSystem.class);
-
-  final FileSystem fs;
-
-  public FallbackFileSystem(DrillConfig config, FileSystem fs) {
-    this.fs = fs;
-  }
-
-  @Override
-  public FileSystem getUnderlying() {
-    return fs;
-  }
-
-  @Override
-  public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
-    if (recursive) {
-      List<FileStatus> statuses = Lists.newArrayList();
-      for (Path p : paths) {
-        addRecursiveStatus(fs.getFileStatus(p), statuses);
-      }
-      return statuses;
-
-    } else {
-      return Lists.newArrayList(fs.listStatus(paths));
-    }
-  }
-
-
-  private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
-    if (parent.isDir()) {
-      Path pattern = new Path(parent.getPath(), "*");
-      FileStatus[] sub = fs.globStatus(pattern, new DrillPathFilter());
-      for(FileStatus s : sub){
-        if (s.isDir()) {
-          addRecursiveStatus(s, listToFill);
-        } else {
-          listToFill.add(s);
-        }
-      }
-    } else {
-      listToFill.add(parent);
-    }
-  }
-
-  @Override
-  public FileStatus getFileStatus(Path p) throws IOException {
-    return fs.getFileStatus(p);
-  }
-
-  @Override
-  public DrillOutputStream create(Path p) throws IOException {
-    return new Out(fs.create(p));
-  }
-
-  @Override
-  public DrillInputStream open(Path p) throws IOException {
-    return new In(fs.open(p));
-  }
-
-  @Override
-  public void close() throws Exception {
-    fs.close();
-  }
-
-  @Override
-  public BlockLocation[] getBlockLocations(FileStatus status, long start, long len) throws IOException {
-    return fs.getFileBlockLocations(status, start, len);
-  }
-
-  private class Out extends DrillOutputStream {
-
-    private final FSDataOutputStream out;
-
-    public Out(FSDataOutputStream out) {
-      super();
-      this.out = out;
-    }
-
-    @Override
-    public void close() throws Exception {
-      out.close();
-    }
-
-    @Override
-    public FSDataOutputStream getOuputStream() {
-      return out;
-    }
-
-  }
-
-  private class In extends DrillInputStream {
-
-    private final FSDataInputStream in;
-
-    public In(FSDataInputStream in) {
-      super();
-      this.in = in;
-    }
-
-    @Override
-    public FSDataInputStream getInputStream() {
-      return in;
-    }
-
-    @Override
-    public void close() throws Exception {
-      in.close();
-    }
-
-  }
-
-  @Override
-  public boolean mkdirs(Path folderPath) throws IOException {
-    if (!fs.exists(folderPath)) {
-      return fs.mkdirs(folderPath);
-    } else if (!fs.getFileStatus(folderPath).isDirectory()) {
-      throw new IOException("The specified folder path exists and is not a folder.");
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index d41243d..6cf1ce5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -35,7 +35,7 @@ import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -56,9 +56,9 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
   }
 
   @Override
-  public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
       List<SchemaPath> columns) throws ExecutionSetupException {
-    return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+    return new JSONRecordReader(context, fileWork.getPath(), dfs, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 0070d18..557c0f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.fn.JsonReader.ReadState;
@@ -54,8 +55,8 @@ public class JSONRecordReader extends AbstractRecordReader {
   private OutputMutator mutator;
   private VectorContainerWriter writer;
   private Path hadoopPath;
-  private FileSystem fileSystem;
   private InputStream stream;
+  private DrillFileSystem fileSystem;
   private JsonReader jsonReader;
   private int recordCount;
   private FragmentContext fragmentContext;
@@ -63,7 +64,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private List<SchemaPath> columns;
   private boolean enableAllTextMode;
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
                           List<SchemaPath> columns) throws OutOfMemoryException {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index b64a032..bf46395 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,9 +62,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
 
   @Override
-  public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
       List<SchemaPath> columns) throws ExecutionSetupException {
-    Path path = getFileSystem().getUnderlying().makeQualified(new Path(fileWork.getPath()));
+    Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
     Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
     return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 109033a..ab864db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -45,7 +45,7 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -79,7 +79,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
-    this.codecFactoryExposer = new CodecFactoryExposer(fs.getUnderlying().getConf());
+    this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
     this.config = formatConfig;
     this.formatMatcher = new ParquetFormatMatcher(this, fs);
     this.storageConfig = storageConfig;
@@ -88,7 +88,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   Configuration getHadoopConfig() {
-    return fs.getUnderlying().getConf();
+    return fs.getConf();
   }
 
   public DrillFileSystem getFileSystem() {
@@ -219,13 +219,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
     boolean isDirReadable(FileStatus dir) {
       Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
       try {
-        if (fs.getUnderlying().exists(p)) {
+        if (fs.exists(p)) {
           return true;
         } else {
 
           PathFilter filter = new DrillPathFilter();
 
-          FileStatus[] files = fs.getUnderlying().listStatus(dir.getPath(), filter);
+          FileStatus[] files = fs.listStatus(dir.getPath(), filter);
           if (files.length == 0) {
             return false;
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 8ddf5fd..28e9f4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -135,7 +135,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin);
-    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.fs = formatPlugin.getFileSystem();
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
@@ -155,7 +155,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatPlugin = formatPlugin;
     this.columns = columns;
     this.formatConfig = formatPlugin.getConfig();
-    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.fs = formatPlugin.getFileSystem();
 
     this.entries = Lists.newArrayList();
     for (FileStatus file : files) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index dc1d892..ad1bf32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -29,12 +29,14 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +66,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
     String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     List<SchemaPath> columns = rowGroupScan.getColumns();
     List<RecordReader> readers = Lists.newArrayList();
+    OperatorContext oContext = new OperatorContext(rowGroupScan, context,
+        false /* ScanBatch is not subject to fragment memory limit */);
 
     List<String[]> partitionColumns = Lists.newArrayList();
     List<Integer> selectedPartitionColumns = Lists.newArrayList();
@@ -90,7 +94,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       rowGroupScan.setOperatorId(id);
     }
 
-    FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
+    DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
     Configuration conf = fs.getConf();
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
@@ -123,7 +127,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
           );
         } else {
           ParquetMetadata footer = footers.get(e.getPath());
-          readers.add(new DrillParquetReader(context, footer, e, newColumns, conf));
+          readers.add(new DrillParquetReader(context, footer, e, newColumns, fs));
         }
         if (rowGroupScan.getSelectionRoot() != null) {
           String[] r = rowGroupScan.getSelectionRoot().split("/");
@@ -149,7 +153,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       }
     }
 
-    ScanBatch s = new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+    ScanBatch s =
+        new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
 
     for(RecordReader r  : readers){
       r.setOperatorContext(s.getOperatorContext());

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 25f383f..8778ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseValueVector;
@@ -82,7 +83,7 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   private ParquetMetadata footer;
   private MessageType schema;
-  private Configuration conf;
+  private DrillFileSystem fileSystem;
   private RowGroupReadEntry entry;
   private VectorContainerWriter writer;
   private ColumnChunkIncReadStore pageReadStore;
@@ -111,9 +112,10 @@ public class DrillParquetReader extends AbstractRecordReader {
   boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema
 
 
-  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
+  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry,
+      List<SchemaPath> columns, DrillFileSystem fileSystem) {
     this.footer = footer;
-    this.conf = conf;
+    this.fileSystem = fileSystem;
     this.entry = entry;
     setColumns(columns);
     this.fragmentContext = fragmentContext;
@@ -243,8 +245,7 @@ public class DrillParquetReader extends AbstractRecordReader {
         paths.put(md.getPath(), md);
       }
 
-      CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(conf);
-      FileSystem fs = FileSystem.get(conf);
+      CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf());
       Path filePath = new Path(entry.getPath());
 
       BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
@@ -252,7 +253,7 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-              codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fs, filePath);
+              codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index 40f25e7..baa998d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.URI;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -31,8 +30,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.hadoop.conf.Configuration;
@@ -65,7 +63,7 @@ public class FilePStore<V> implements PStore<V> {
   }
 
   private void mk(Path path) throws IOException{
-    fs.getUnderlying().mkdirs(path);
+    fs.mkdirs(path);
   }
 
   public static Path getLogDir(){
@@ -84,8 +82,8 @@ public class FilePStore<V> implements PStore<V> {
     }
 
 
-    DrillFileSystem fs = FileSystemCreator.getFileSystem(config, fsConf);
-    fs.getUnderlying().mkdirs(blobRoot);
+    DrillFileSystem fs = new DrillFileSystem(fsConf);
+    fs.mkdirs(blobRoot);
     return fs;
   }
 
@@ -128,14 +126,14 @@ public class FilePStore<V> implements PStore<V> {
   public V get(String key) {
     try{
       Path path = p(key);
-      if(!fs.getUnderlying().exists(path)){
+      if(!fs.exists(path)){
         return null;
       }
     }catch(IOException e){
       throw new RuntimeException(e);
     }
 
-    try (InputStream is = fs.open(p(key)).getInputStream()) {
+    try (InputStream is = fs.open(p(key))) {
       return config.getSerializer().deserialize(IOUtils.toByteArray(is));
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -143,7 +141,7 @@ public class FilePStore<V> implements PStore<V> {
   }
 
   public void put(String key, V value) {
-    try (OutputStream os = fs.create(p(key)).getOuputStream()) {
+    try (OutputStream os = fs.create(p(key))) {
       IOUtils.write(config.getSerializer().serialize(value), os);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -154,7 +152,7 @@ public class FilePStore<V> implements PStore<V> {
   public boolean putIfAbsent(String key, V value) {
     try {
       Path p = p(key);
-      if (fs.getUnderlying().exists(p)) {
+      if (fs.exists(p)) {
         return false;
       } else {
         put(key, value);
@@ -167,7 +165,7 @@ public class FilePStore<V> implements PStore<V> {
 
   public void delete(String key) {
     try {
-      fs.getUnderlying().delete(p(key), false);
+      fs.delete(p(key), false);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index ac53a61..3131290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -23,8 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;