You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/23 05:14:48 UTC

[10/10] git commit: DRILL-468 Support for FileSystem partitions

DRILL-468 Support for FileSystem partitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/69c571cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/69c571cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/69c571cc

Branch: refs/heads/master
Commit: 69c571ccd841b7bcda1c38979716862690cba696
Parents: 54287d0
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Mar 26 11:50:04 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 22 20:06:03 2014 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-override.conf  |  5 +-
 .../org/apache/drill/exec/ExecConstants.java    |  5 +-
 .../drill/exec/physical/impl/ScanBatch.java     | 69 +++++++++++++++++++-
 .../drill/exec/store/dfs/DrillPathFilter.java   | 31 +++++++++
 .../drill/exec/store/dfs/FileSelection.java     | 31 ++++++---
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  2 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   | 52 +++++++++++++--
 .../exec/store/dfs/easy/EasyGroupScan.java      | 18 +++--
 .../drill/exec/store/dfs/easy/EasySubScan.java  | 14 +++-
 .../dfs/shim/fallback/FallbackFileSystem.java   |  9 ++-
 .../exec/store/parquet/ParquetFormatPlugin.java | 20 +-----
 .../exec/store/parquet/ParquetGroupScan.java    | 19 ++++--
 .../exec/store/parquet/ParquetRowGroupScan.java | 17 +++--
 .../store/parquet/ParquetScanBatchCreator.java  | 60 +++++++++++++----
 .../src/main/resources/drill-module.conf        |  3 +-
 .../org/apache/drill/TestExampleQueries.java    | 15 +++++
 .../exec/store/text/TextRecordReaderTest.java   |  2 +-
 .../src/test/resources/storage-engines.json     | 13 ----
 .../resources/store/text/data/d1/regions.csv    |  5 ++
 .../test/resources/store/text/data/regions.csv  |  5 ++
 .../src/test/resources/store/text/regions.csv   |  5 --
 pom.xml                                         |  2 +-
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |  1 +
 23 files changed, 313 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index a5e5522..a9316a9 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -56,7 +56,8 @@ drill.exec: {
       text: {
         buffer.size: 262144,
         batch.size: 4000
-      }
+      },
+      partition.column.label: "dir"
     }
   },
   metrics : {
@@ -89,7 +90,7 @@ drill.exec: {
     executor.threads: 4
   },
   trace: {
-    directory: "/var/log/drill",
+    directory: "/tmp/drill-trace",
     filesystem: "file:///"
   },
   tmp: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f88b1b4..34bde9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -58,7 +58,8 @@ public interface ExecConstants {
   public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
   public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
   public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
-  public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
   public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
-
+  public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+  public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ace2677..e93fbcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,11 +17,19 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -36,9 +44,13 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -56,14 +68,29 @@ public class ScanBatch implements RecordBatch {
   private RecordReader currentReader;
   private BatchSchema schema;
   private final Mutator mutator = new Mutator();
+  private Iterator<String[]> partitionColumns;
+  private String[] partitionValues;
+  List<ValueVector> partitionVectors;
+  List<Integer> selectedPartitionColumns;
+  private String partitionColumnDesignator;
 
-  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
     this.readers = readers;
     if (!readers.hasNext())
       throw new ExecutionSetupException("A scan batch must contain at least one reader.");
     this.currentReader = readers.next();
     this.currentReader.setup(mutator);
+    this.partitionColumns = partitionColumns.iterator();
+    this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
+    this.selectedPartitionColumns = selectedPartitionColumns;
+    DrillConfig config = context.getConfig(); //This nonsense it is to not break all the stupid unit tests using SimpleRootExec
+    this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+    addPartitionVectors();
+  }
+
+  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+    this(context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
   }
 
   @Override
@@ -101,7 +128,10 @@ public class ScanBatch implements RecordBatch {
         }
         currentReader.cleanup();
         currentReader = readers.next();
+        partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+        mutator.removeAllFields();
         currentReader.setup(mutator);
+        addPartitionVectors();
       } catch (ExecutionSetupException e) {
         this.context.fail(e);
         releaseAssets();
@@ -109,6 +139,7 @@ public class ScanBatch implements RecordBatch {
       }
     }
 
+    populatePartitionVectors();
     if (schemaChanged) {
       schemaChanged = false;
       return IterOutcome.OK_NEW_SCHEMA;
@@ -117,6 +148,42 @@ public class ScanBatch implements RecordBatch {
     }
   }
 
+  private void addPartitionVectors() {
+    partitionVectors = Lists.newArrayList();
+    for (int i : selectedPartitionColumns) {
+      MaterializedField field;
+      ValueVector v;
+      if (partitionValues.length > i) {
+        field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
+        v = new VarCharVector(field, context.getAllocator());
+      } else {
+        field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
+        v = new NullableVarCharVector(field, context.getAllocator());
+      }
+      mutator.addField(v);
+      partitionVectors.add(v);
+    }
+  }
+
+  private void populatePartitionVectors() {
+    for (int i : selectedPartitionColumns) {
+      if (partitionValues.length > i) {
+        VarCharVector v = (VarCharVector) partitionVectors.get(i);
+        String val = partitionValues[i];
+        byte[] bytes = val.getBytes();
+        AllocationHelper.allocate(v, recordCount, val.length());
+        for (int j = 0; j < recordCount; j++) {
+          v.getMutator().set(j, bytes);
+        }
+        v.getMutator().setValueCount(recordCount);
+      } else {
+        NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(i);
+        AllocationHelper.allocate(v, recordCount, 0);
+        v.getMutator().setValueCount(recordCount);
+      }
+    }
+  }
+
   @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
new file mode 100644
index 0000000..81c8779
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Utils;
+
+public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
+  @Override
+  public boolean accept(Path path) {
+    if (path.toString().contains("_metadata")) {
+      return false;
+    }
+    return super.accept(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 5ab2c1a..14c5ad8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -41,21 +43,31 @@ public class FileSelection {
   private List<FileStatus> statuses;
 
   public List<String> files;
+  public String selectionRoot;
 
   public FileSelection() {
   }
-  
+
+  public FileSelection(List<String> files, String selectionRoot, boolean dummy) {
+    this.files = files;
+    this.selectionRoot = selectionRoot;
+  }
   
   public FileSelection(List<String> files, boolean dummy){
     this.files = files;
   }
 
   public FileSelection(List<FileStatus> statuses) {
+    this(statuses, null);
+  }
+
+  public FileSelection(List<FileStatus> statuses, String selectionRoot) {
     this.statuses = statuses;
     this.files = Lists.newArrayList();
     for (FileStatus f : statuses) {
       files.add(f.getPath().toString());
     }
+    this.selectionRoot = selectionRoot;
   }
 
   public boolean containsDirectories(DrillFileSystem fs) throws IOException {
@@ -66,7 +78,7 @@ public class FileSelection {
     return false;
   }
 
-  public FileSelection minusDirectorries(DrillFileSystem fs) throws IOException {
+  public FileSelection minusDirectories(DrillFileSystem fs) throws IOException {
     init(fs);
     List<FileStatus> newList = Lists.newArrayList();
     for (FileStatus p : statuses) {
@@ -75,12 +87,11 @@ public class FileSelection {
         for (FileStatus s : statuses) {
           newList.add(s);
         }
-
       } else {
         newList.add(p);
       }
     }
-    return new FileSelection(newList);
+    return new FileSelection(newList, selectionRoot);
   }
 
   public FileStatus getFirstPath(DrillFileSystem fs) throws IOException {
@@ -116,11 +127,15 @@ public class FileSelection {
     if ( !(path.contains("*") || path.contains("?")) ) {
       Path p = new Path(parent, path);
       FileStatus status = fs.getFileStatus(p);
-      return new FileSelection(Collections.singletonList(status));
+      return new FileSelection(Collections.singletonList(status), p.toUri().getPath());
     } else {
-      FileStatus[] status = fs.getUnderlying().globStatus(new Path(parent, path));
+      Path p = new Path(parent, path);
+      FileStatus[] status = fs.getUnderlying().globStatus(p);
       if(status == null || status.length == 0) return null;
-      return new FileSelection(Lists.newArrayList(status));
+      String[] s = p.toUri().getPath().split("/");
+      String newPath = StringUtils.join(ArrayUtils.subarray(s, 0, s.length - 1), "/");
+      Preconditions.checkState(!newPath.contains("*") && !newPath.contains("?"), String.format("Unsupported selection path: %s", p));
+      return new FileSelection(Lists.newArrayList(status), newPath);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index c77bd92..1551e5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -81,7 +81,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
             logger.debug("File read failed.", e);
           }
         }
-        fileSelection = fileSelection.minusDirectorries(fs);
+        fileSelection = fileSelection.minusDirectories(fs);
       }
 
       for (FormatMatcher m : fileMatchers) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 9c1dc74..6e87da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -20,12 +20,16 @@ package org.apache.drill.exec.store.dfs.easy;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -41,8 +45,6 @@ import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 
 import com.beust.jcommander.internal.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
@@ -108,17 +110,55 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   
   RecordBatch getBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+    String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+    List<SchemaPath> columns = scan.getColumns();
     List<RecordReader> readers = Lists.newArrayList();
+    List<String[]> partitionColumns = Lists.newArrayList();
+    List<Integer> selectedPartitionColumns = Lists.newArrayList();
+    boolean selectAllColumns = false;
+
+    if (columns == null || columns.size() == 0) {
+      selectAllColumns = true;
+    } else {
+      Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+      for (SchemaPath column : columns) {
+        Matcher m = pattern.matcher(column.getAsUnescapedPath());
+        if (m.matches()) {
+          scan.getColumns().remove(column);
+          selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+        }
+      }
+    }
+    int numParts = 0;
     for(FileWork work : scan.getWorkUnits()){
-      readers.add(getRecordReader(context, work, scan.getColumns())); 
+      readers.add(getRecordReader(context, work, scan.getColumns()));
+      if (scan.getSelectionRoot() != null) {
+        String[] r = scan.getSelectionRoot().split("/");
+        String[] p = work.getPath().split("/");
+        if (p.length > r.length) {
+          String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+          partitionColumns.add(q);
+          numParts = Math.max(numParts, q.length);
+        } else {
+          partitionColumns.add(new String[] {});
+        }
+      } else {
+        partitionColumns.add(new String[] {});
+      }
+    }
+
+    if (selectAllColumns) {
+      for (int i = 0; i < numParts; i++) {
+        selectedPartitionColumns.add(i);
+      }
     }
-    
-    return new ScanBatch(context, readers.iterator());
+
+    return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
   }
   
   @Override
   public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new EasyGroupScan(selection, this, null);
+    return new EasyGroupScan(selection, this, null, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index fc2ae2c..68fee34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,11 +32,8 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
@@ -64,6 +61,7 @@ public class EasyGroupScan extends AbstractGroupScan{
   private ListMultimap<Integer, CompleteFileWork> mappings;
   private List<CompleteFileWork> chunks;
   private List<EndpointAffinity> endpointAffinities;
+  private String selectionRoot;
 
   @JsonCreator
   public EasyGroupScan(
@@ -71,7 +69,8 @@ public class EasyGroupScan extends AbstractGroupScan{
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JacksonInject StoragePluginRegistry engineRegistry, // 
-      @JsonProperty("columns") List<SchemaPath> columns
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
 
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -87,12 +86,14 @@ public class EasyGroupScan extends AbstractGroupScan{
     }
     maxWidth = chunks.size();
     this.columns = columns;
+    this.selectionRoot = selectionRoot;
   }
   
   public EasyGroupScan(
       FileSelection selection, //
       EasyFormatPlugin<?> formatPlugin, // 
-      List<SchemaPath> columns
+      List<SchemaPath> columns,
+      String selectionRoot
       ) throws IOException{
     this.selection = selection;
     this.formatPlugin = formatPlugin;
@@ -106,6 +107,11 @@ public class EasyGroupScan extends AbstractGroupScan{
       this.endpointAffinities = Collections.emptyList();
     }
     maxWidth = chunks.size();
+    this.selectionRoot = selectionRoot;
+  }
+
+  public String getSelectionRoot() {
+    return selectionRoot;
   }
 
   @Override
@@ -170,7 +176,7 @@ public class EasyGroupScan extends AbstractGroupScan{
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new EasySubScan(convert(filesForMinor), formatPlugin, columns);
+    return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot);
   }
   
   private List<FileWorkImpl> convert(List<CompleteFileWork> list){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index c01fb84..0b3fe0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -25,7 +25,6 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
@@ -45,6 +44,7 @@ public class EasySubScan extends AbstractSubScan{
   private final List<FileWorkImpl> files;
   private final EasyFormatPlugin<?> formatPlugin;
   private final List<SchemaPath> columns;
+  private String selectionRoot;
   
   @JsonCreator
   public EasySubScan(
@@ -52,19 +52,27 @@ public class EasySubScan extends AbstractSubScan{
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JacksonInject StoragePluginRegistry engineRegistry, // 
-      @JsonProperty("columns") List<SchemaPath> columns //
+      @JsonProperty("columns") List<SchemaPath> columns, //
+      @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
 
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
     this.files = files;
     this.columns = columns;
+    this.selectionRoot = selectionRoot;
   }
   
-  public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns){
+  public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
     this.formatPlugin = plugin;
     this.files = files;
     this.columns = columns;
+    this.selectionRoot = selectionRoot;
+  }
+
+  @JsonProperty
+  public String getSelectionRoot() {
+    return selectionRoot;
   }
   
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
index 340919d..0c18e71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
 import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
@@ -65,9 +66,13 @@ public class FallbackFileSystem extends DrillFileSystem {
   private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
     if (parent.isDir()) {
       Path pattern = new Path(parent.getPath(), "*");
-      FileStatus[] sub = fs.globStatus(pattern);
+      FileStatus[] sub = fs.globStatus(pattern, new DrillPathFilter());
       for(FileStatus s : sub){
-        listToFill.add(s);
+        if (s.isDir()) {
+          addRecursiveStatus(s, listToFill);
+        } else {
+          listToFill.add(s);
+        }
       }
     } else {
       listToFill.add(parent);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index cde9b08..d9e6795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -26,12 +26,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.QueryOptimizerRule;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.*;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +34,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.Utils;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
@@ -101,7 +95,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   @Override
   public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new ParquetGroupScan( selection.getFileStatusList(fs), this);
+    return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot);
   }
 
   @Override
@@ -170,15 +164,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
           return true;
         } else {
 
-          PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter() {
-            @Override
-            public boolean accept(Path path) {
-              if (path.toString().contains("_metadata")) {
-                return false;
-              }
-              return super.accept(path);
-            }
-          };
+          PathFilter filter = new DrillPathFilter();
 
           FileStatus[] files = fs.getUnderlying().listStatus(dir.getPath(), filter);
           if (files.length == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index bcee2be..cd7575d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.common.expression.FieldReference;
@@ -38,7 +37,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -89,6 +87,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
   private final ParquetFormatConfig formatConfig;
   private final FileSystem fs;
   private List<EndpointAffinity> endpointAffinities;
+  private String selectionRoot;
 
   private List<SchemaPath> columns;
 
@@ -112,7 +111,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JacksonInject StoragePluginRegistry engineRegistry, // 
-      @JsonProperty("columns") List<SchemaPath> columns //
+      @JsonProperty("columns") List<SchemaPath> columns, //
+      @JsonProperty("selectionRoot") String selectionRoot //
       ) throws IOException, ExecutionSetupException {
     this.columns = columns;
     if(formatConfig == null) formatConfig = new ParquetFormatConfig();
@@ -123,12 +123,18 @@ public class ParquetGroupScan extends AbstractGroupScan {
     this.fs = formatPlugin.getFileSystem().getUnderlying();
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
+    this.selectionRoot = selectionRoot;
     this.readFooterFromEntries();
 
   }
 
+  public String getSelectionRoot() {
+    return selectionRoot;
+  }
+
   public ParquetGroupScan(List<FileStatus> files, //
-      ParquetFormatPlugin formatPlugin) //
+      ParquetFormatPlugin formatPlugin, //
+      String selectionRoot) //
       throws IOException {
     this.formatPlugin = formatPlugin;
     this.columns = null;
@@ -140,6 +146,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
       entries.add(new ReadEntryWithPath(file.getPath().toString()));
     }
     
+    this.selectionRoot = selectionRoot;
+
     readFooter(files);
   }
 
@@ -202,6 +210,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
 
     private EndpointByteMap byteMap;
     private int rowGroupIndex;
+    private String root;
 
     @JsonCreator
     public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
@@ -282,7 +291,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
     Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns);
+    return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 0b1a788..dd5c91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -34,7 +33,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -53,6 +51,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   private final ParquetFormatPlugin formatPlugin;
   private final List<RowGroupReadEntry> rowGroupReadEntries;
   private final List<SchemaPath> columns;
+  private String selectionRoot;
 
   @JsonCreator
   public ParquetRowGroupScan( //
@@ -60,7 +59,8 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
-      @JsonProperty("columns") List<SchemaPath> columns //
+      @JsonProperty("columns") List<SchemaPath> columns, //
+      @JsonProperty("selectionRoot") String selectionRoot //
   ) throws ExecutionSetupException {
 
     if(formatConfig == null) formatConfig = new ParquetFormatConfig();
@@ -71,16 +71,19 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.formatConfig = formatPlugin.getConfig();
     this.columns = columns;
+    this.selectionRoot = selectionRoot;
   }
 
   public ParquetRowGroupScan( //
       ParquetFormatPlugin formatPlugin, //
       List<RowGroupReadEntry> rowGroupReadEntries, //
-      List<SchemaPath> columns) {
+      List<SchemaPath> columns,
+      String selectionRoot) {
     this.formatPlugin = formatPlugin;
     this.formatConfig = formatPlugin.getConfig();
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns;
+    this.selectionRoot = selectionRoot;
   }
 
   @JsonProperty("entries")
@@ -93,6 +96,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return formatPlugin.getStorageConfig();
   }
 
+  public String getSelectionRoot() {
+    return selectionRoot;
+  }
+
   @Override
   public OperatorCost getCost() {
     return null;
@@ -121,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns);
+    return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns, selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d36dbc0..6278a79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,33 +18,29 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.mock.MockScanBatchCreator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 
@@ -54,12 +50,34 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
   @Override
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
+    String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+    List<SchemaPath> columns = rowGroupScan.getColumns();
+
     List<RecordReader> readers = Lists.newArrayList();
-    
+
+    List<String[]> partitionColumns = Lists.newArrayList();
+    List<Integer> selectedPartitionColumns = Lists.newArrayList();
+    boolean selectAllColumns = false;
+
+    if (columns == null || columns.size() == 0) {
+      selectAllColumns = true;
+    } else {
+      Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+      for (SchemaPath column : columns) {
+        Matcher m = pattern.matcher(column.getAsUnescapedPath());
+        if (m.matches()) {
+          columns.remove(column);
+          selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+        }
+      }
+    }
+
+
     FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
     
     // keep footers in a map to avoid re-reading them
     Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
+    int numParts = 0;
     for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
       /*
       Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
@@ -81,10 +99,30 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
                 rowGroupScan.getColumns()
             )
         );
+        if (rowGroupScan.getSelectionRoot() != null) {
+          String[] r = rowGroupScan.getSelectionRoot().split("/");
+          String[] p = e.getPath().split("/");
+          if (p.length > r.length) {
+            String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+            partitionColumns.add(q);
+            numParts = Math.max(numParts, q.length);
+          } else {
+            partitionColumns.add(new String[] {});
+          }
+        } else {
+          partitionColumns.add(new String[] {});
+        }
       } catch (IOException e1) {
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(context, readers.iterator());
+
+    if (selectAllColumns) {
+      for (int i = 0; i < numParts; i++) {
+        selectedPartitionColumns.add(i);
+      }
+    }
+
+    return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f3b04b5..2f145a7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -41,7 +41,8 @@ drill.exec: {
       text: {
         buffer.size: 262144,
         batch.size: 4000
-      }
+      },
+      partition.column.label: "dir"
     }
   },
   metrics : { 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 191115b..e0e874b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill;
 
+import org.apache.drill.common.util.FileUtils;
 import org.junit.Test;
 
 public class TestExampleQueries extends BaseTestQuery{
@@ -43,6 +44,20 @@ public class TestExampleQueries extends BaseTestQuery{
   }
 
   @Test
+  public void testText() throws Exception {
+    String root = FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString();
+    String query = String.format("select * from dfs.`%s`", root);
+    test(query);
+  }
+
+  @Test
+  public void testTextPartitions() throws Exception {
+    String root = FileUtils.getResourceAsFile("/store/text/data/").toURI().toString();
+    String query = String.format("select * from dfs.`%s`", root);
+    test(query);
+  }
+
+  @Test
   public void testJoin() throws Exception{
     test("SELECT\n" +
         "  nations.N_NAME,\n" +

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
index 5fbcc8b..0155690 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -71,7 +71,7 @@ public class TextRecordReaderTest extends PopUnitTestBase {
       List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
               Files.toString(
                       FileUtils.getResourceAsFile("/store/text/test.json"), Charsets.UTF_8)
-                      .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/regions.csv").toURI().toString()));
+                      .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString()));
       int count = 0;
       RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
       for(QueryResultBatch b : results) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/storage-engines.json b/exec/java-exec/src/test/resources/storage-engines.json
deleted file mode 100644
index 73899ee..0000000
--- a/exec/java-exec/src/test/resources/storage-engines.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "storage":{
-    dfs: {
-      type: "file",
-      connection: "file:///"
-    },  
-    cp: {
-      type: "file",
-      connection: "classpath:///"
-    }      
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv b/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
new file mode 100644
index 0000000..e97d2ed
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
@@ -0,0 +1,5 @@
+0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
+1,AMERICA,hs use ironic, even requests. s,
+2,ASIA,ges. thinly even pinto beans ca,
+3,EUROPE,ly final courts cajole furiously final excuse,
+4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/data/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/regions.csv b/exec/java-exec/src/test/resources/store/text/data/regions.csv
new file mode 100644
index 0000000..e97d2ed
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/data/regions.csv
@@ -0,0 +1,5 @@
+0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
+1,AMERICA,hs use ironic, even requests. s,
+2,ASIA,ges. thinly even pinto beans ca,
+3,EUROPE,ly final courts cajole furiously final excuse,
+4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/regions.csv b/exec/java-exec/src/test/resources/store/text/regions.csv
deleted file mode 100644
index e97d2ed..0000000
--- a/exec/java-exec/src/test/resources/store/text/regions.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
-1,AMERICA,hs use ironic, even requests. s,
-2,ASIA,ges. thinly even pinto beans ca,
-3,EUROPE,ly final courts cajole furiously final excuse,
-4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d1123d..7166ee4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,7 @@
             <forkCount>8</forkCount>
             <reuseForks>true</reuseForks>
             <additionalClasspathElements>
-              <additionalClasspathElement>./sqlparser/src/test/resources/storage-engines.json</additionalClasspathElement>
+              <additionalClasspathElement>./sqlparser/src/test/resources/storage-plugins.json</additionalClasspathElement>
             </additionalClasspathElements>
           </configuration>
         </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index b454b52..ef1674f 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -26,6 +26,7 @@ import java.sql.Statement;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
+import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
 import org.apache.drill.jdbc.Driver;