You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/23 05:14:48 UTC
[10/10] git commit: DRILL-468 Support for FileSystem partitions
DRILL-468 Support for FileSystem partitions
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/69c571cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/69c571cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/69c571cc
Branch: refs/heads/master
Commit: 69c571ccd841b7bcda1c38979716862690cba696
Parents: 54287d0
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Mar 26 11:50:04 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 22 20:06:03 2014 -0700
----------------------------------------------------------------------
distribution/src/resources/drill-override.conf | 5 +-
.../org/apache/drill/exec/ExecConstants.java | 5 +-
.../drill/exec/physical/impl/ScanBatch.java | 69 +++++++++++++++++++-
.../drill/exec/store/dfs/DrillPathFilter.java | 31 +++++++++
.../drill/exec/store/dfs/FileSelection.java | 31 ++++++---
.../exec/store/dfs/WorkspaceSchemaFactory.java | 2 +-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 52 +++++++++++++--
.../exec/store/dfs/easy/EasyGroupScan.java | 18 +++--
.../drill/exec/store/dfs/easy/EasySubScan.java | 14 +++-
.../dfs/shim/fallback/FallbackFileSystem.java | 9 ++-
.../exec/store/parquet/ParquetFormatPlugin.java | 20 +-----
.../exec/store/parquet/ParquetGroupScan.java | 19 ++++--
.../exec/store/parquet/ParquetRowGroupScan.java | 17 +++--
.../store/parquet/ParquetScanBatchCreator.java | 60 +++++++++++++----
.../src/main/resources/drill-module.conf | 3 +-
.../org/apache/drill/TestExampleQueries.java | 15 +++++
.../exec/store/text/TextRecordReaderTest.java | 2 +-
.../src/test/resources/storage-engines.json | 13 ----
.../resources/store/text/data/d1/regions.csv | 5 ++
.../test/resources/store/text/data/regions.csv | 5 ++
.../src/test/resources/store/text/regions.csv | 5 --
pom.xml | 2 +-
.../apache/drill/jdbc/test/TestJdbcQuery.java | 1 +
23 files changed, 313 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index a5e5522..a9316a9 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -56,7 +56,8 @@ drill.exec: {
text: {
buffer.size: 262144,
batch.size: 4000
- }
+ },
+ partition.column.label: "dir"
}
},
metrics : {
@@ -89,7 +90,7 @@ drill.exec: {
executor.threads: 4
},
trace: {
- directory: "/var/log/drill",
+ directory: "/tmp/drill-trace",
filesystem: "file:///"
},
tmp: {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f88b1b4..34bde9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -58,7 +58,8 @@ public interface ExecConstants {
public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
- public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
-
+ public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+ public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ace2677..e93fbcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,11 +17,19 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
@@ -36,9 +44,13 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
/**
* Record batch used for a particular scan. Operators against one or more
@@ -56,14 +68,29 @@ public class ScanBatch implements RecordBatch {
private RecordReader currentReader;
private BatchSchema schema;
private final Mutator mutator = new Mutator();
+ private Iterator<String[]> partitionColumns;
+ private String[] partitionValues;
+ List<ValueVector> partitionVectors;
+ List<Integer> selectedPartitionColumns;
+ private String partitionColumnDesignator;
- public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+ public ScanBatch(FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
if (!readers.hasNext())
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
this.currentReader = readers.next();
this.currentReader.setup(mutator);
+ this.partitionColumns = partitionColumns.iterator();
+ this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
+ this.selectedPartitionColumns = selectedPartitionColumns;
+ DrillConfig config = context.getConfig(); //This nonsense it is to not break all the stupid unit tests using SimpleRootExec
+ this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ addPartitionVectors();
+ }
+
+ public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+ this(context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}
@Override
@@ -101,7 +128,10 @@ public class ScanBatch implements RecordBatch {
}
currentReader.cleanup();
currentReader = readers.next();
+ partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+ mutator.removeAllFields();
currentReader.setup(mutator);
+ addPartitionVectors();
} catch (ExecutionSetupException e) {
this.context.fail(e);
releaseAssets();
@@ -109,6 +139,7 @@ public class ScanBatch implements RecordBatch {
}
}
+ populatePartitionVectors();
if (schemaChanged) {
schemaChanged = false;
return IterOutcome.OK_NEW_SCHEMA;
@@ -117,6 +148,42 @@ public class ScanBatch implements RecordBatch {
}
}
+ private void addPartitionVectors() {
+ partitionVectors = Lists.newArrayList();
+ for (int i : selectedPartitionColumns) {
+ MaterializedField field;
+ ValueVector v;
+ if (partitionValues.length > i) {
+ field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
+ v = new VarCharVector(field, context.getAllocator());
+ } else {
+ field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
+ v = new NullableVarCharVector(field, context.getAllocator());
+ }
+ mutator.addField(v);
+ partitionVectors.add(v);
+ }
+ }
+
+ private void populatePartitionVectors() {
+ for (int i : selectedPartitionColumns) {
+ if (partitionValues.length > i) {
+ VarCharVector v = (VarCharVector) partitionVectors.get(i);
+ String val = partitionValues[i];
+ byte[] bytes = val.getBytes();
+ AllocationHelper.allocate(v, recordCount, val.length());
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, bytes);
+ }
+ v.getMutator().setValueCount(recordCount);
+ } else {
+ NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(i);
+ AllocationHelper.allocate(v, recordCount, 0);
+ v.getMutator().setValueCount(recordCount);
+ }
+ }
+ }
+
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
new file mode 100644
index 0000000..81c8779
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Utils;
+
+public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
+ @Override
+ public boolean accept(Path path) {
+ if (path.toString().contains("_metadata")) {
+ return false;
+ }
+ return super.accept(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 5ab2c1a..14c5ad8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.store.dfs;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -41,21 +43,31 @@ public class FileSelection {
private List<FileStatus> statuses;
public List<String> files;
+ public String selectionRoot;
public FileSelection() {
}
-
+
+ public FileSelection(List<String> files, String selectionRoot, boolean dummy) {
+ this.files = files;
+ this.selectionRoot = selectionRoot;
+ }
public FileSelection(List<String> files, boolean dummy){
this.files = files;
}
public FileSelection(List<FileStatus> statuses) {
+ this(statuses, null);
+ }
+
+ public FileSelection(List<FileStatus> statuses, String selectionRoot) {
this.statuses = statuses;
this.files = Lists.newArrayList();
for (FileStatus f : statuses) {
files.add(f.getPath().toString());
}
+ this.selectionRoot = selectionRoot;
}
public boolean containsDirectories(DrillFileSystem fs) throws IOException {
@@ -66,7 +78,7 @@ public class FileSelection {
return false;
}
- public FileSelection minusDirectorries(DrillFileSystem fs) throws IOException {
+ public FileSelection minusDirectories(DrillFileSystem fs) throws IOException {
init(fs);
List<FileStatus> newList = Lists.newArrayList();
for (FileStatus p : statuses) {
@@ -75,12 +87,11 @@ public class FileSelection {
for (FileStatus s : statuses) {
newList.add(s);
}
-
} else {
newList.add(p);
}
}
- return new FileSelection(newList);
+ return new FileSelection(newList, selectionRoot);
}
public FileStatus getFirstPath(DrillFileSystem fs) throws IOException {
@@ -116,11 +127,15 @@ public class FileSelection {
if ( !(path.contains("*") || path.contains("?")) ) {
Path p = new Path(parent, path);
FileStatus status = fs.getFileStatus(p);
- return new FileSelection(Collections.singletonList(status));
+ return new FileSelection(Collections.singletonList(status), p.toUri().getPath());
} else {
- FileStatus[] status = fs.getUnderlying().globStatus(new Path(parent, path));
+ Path p = new Path(parent, path);
+ FileStatus[] status = fs.getUnderlying().globStatus(p);
if(status == null || status.length == 0) return null;
- return new FileSelection(Lists.newArrayList(status));
+ String[] s = p.toUri().getPath().split("/");
+ String newPath = StringUtils.join(ArrayUtils.subarray(s, 0, s.length - 1), "/");
+ Preconditions.checkState(!newPath.contains("*") && !newPath.contains("?"), String.format("Unsupported selection path: %s", p));
+ return new FileSelection(Lists.newArrayList(status), newPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index c77bd92..1551e5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -81,7 +81,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
logger.debug("File read failed.", e);
}
}
- fileSelection = fileSelection.minusDirectorries(fs);
+ fileSelection = fileSelection.minusDirectories(fs);
}
for (FormatMatcher m : fileMatchers) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 9c1dc74..6e87da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -20,12 +20,16 @@ package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -41,8 +45,6 @@ import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import com.beust.jcommander.internal.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
@@ -108,17 +110,55 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
RecordBatch getBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ List<SchemaPath> columns = scan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
+ List<String[]> partitionColumns = Lists.newArrayList();
+ List<Integer> selectedPartitionColumns = Lists.newArrayList();
+ boolean selectAllColumns = false;
+
+ if (columns == null || columns.size() == 0) {
+ selectAllColumns = true;
+ } else {
+ Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+ for (SchemaPath column : columns) {
+ Matcher m = pattern.matcher(column.getAsUnescapedPath());
+ if (m.matches()) {
+ scan.getColumns().remove(column);
+ selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+ }
+ }
+ }
+ int numParts = 0;
for(FileWork work : scan.getWorkUnits()){
- readers.add(getRecordReader(context, work, scan.getColumns()));
+ readers.add(getRecordReader(context, work, scan.getColumns()));
+ if (scan.getSelectionRoot() != null) {
+ String[] r = scan.getSelectionRoot().split("/");
+ String[] p = work.getPath().split("/");
+ if (p.length > r.length) {
+ String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+ partitionColumns.add(q);
+ numParts = Math.max(numParts, q.length);
+ } else {
+ partitionColumns.add(new String[] {});
+ }
+ } else {
+ partitionColumns.add(new String[] {});
+ }
+ }
+
+ if (selectAllColumns) {
+ for (int i = 0; i < numParts; i++) {
+ selectedPartitionColumns.add(i);
+ }
}
-
- return new ScanBatch(context, readers.iterator());
+
+ return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
@Override
public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new EasyGroupScan(selection, this, null);
+ return new EasyGroupScan(selection, this, null, selection.selectionRoot);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index fc2ae2c..68fee34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,11 +32,8 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.BlockMapBuilder;
@@ -64,6 +61,7 @@ public class EasyGroupScan extends AbstractGroupScan{
private ListMultimap<Integer, CompleteFileWork> mappings;
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
+ private String selectionRoot;
@JsonCreator
public EasyGroupScan(
@@ -71,7 +69,8 @@ public class EasyGroupScan extends AbstractGroupScan{
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JacksonInject StoragePluginRegistry engineRegistry, //
- @JsonProperty("columns") List<SchemaPath> columns
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -87,12 +86,14 @@ public class EasyGroupScan extends AbstractGroupScan{
}
maxWidth = chunks.size();
this.columns = columns;
+ this.selectionRoot = selectionRoot;
}
public EasyGroupScan(
FileSelection selection, //
EasyFormatPlugin<?> formatPlugin, //
- List<SchemaPath> columns
+ List<SchemaPath> columns,
+ String selectionRoot
) throws IOException{
this.selection = selection;
this.formatPlugin = formatPlugin;
@@ -106,6 +107,11 @@ public class EasyGroupScan extends AbstractGroupScan{
this.endpointAffinities = Collections.emptyList();
}
maxWidth = chunks.size();
+ this.selectionRoot = selectionRoot;
+ }
+
+ public String getSelectionRoot() {
+ return selectionRoot;
}
@Override
@@ -170,7 +176,7 @@ public class EasyGroupScan extends AbstractGroupScan{
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- return new EasySubScan(convert(filesForMinor), formatPlugin, columns);
+ return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot);
}
private List<FileWorkImpl> convert(List<CompleteFileWork> list){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index c01fb84..0b3fe0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -25,7 +25,6 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.physical.base.AbstractSubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
@@ -45,6 +44,7 @@ public class EasySubScan extends AbstractSubScan{
private final List<FileWorkImpl> files;
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
+ private String selectionRoot;
@JsonCreator
public EasySubScan(
@@ -52,19 +52,27 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JacksonInject StoragePluginRegistry engineRegistry, //
- @JsonProperty("columns") List<SchemaPath> columns //
+ @JsonProperty("columns") List<SchemaPath> columns, //
+ @JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(this.formatPlugin);
this.files = files;
this.columns = columns;
+ this.selectionRoot = selectionRoot;
}
- public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns){
+ public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
+ this.selectionRoot = selectionRoot;
+ }
+
+ @JsonProperty
+ public String getSelectionRoot() {
+ return selectionRoot;
}
@JsonIgnore
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
index 340919d..0c18e71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
@@ -65,9 +66,13 @@ public class FallbackFileSystem extends DrillFileSystem {
private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
if (parent.isDir()) {
Path pattern = new Path(parent.getPath(), "*");
- FileStatus[] sub = fs.globStatus(pattern);
+ FileStatus[] sub = fs.globStatus(pattern, new DrillPathFilter());
for(FileStatus s : sub){
- listToFill.add(s);
+ if (s.isDir()) {
+ addRecursiveStatus(s, listToFill);
+ } else {
+ listToFill.add(s);
+ }
}
} else {
listToFill.add(parent);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index cde9b08..d9e6795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -26,12 +26,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.QueryOptimizerRule;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.*;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +34,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.Utils;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileWriter;
@@ -101,7 +95,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
@Override
public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new ParquetGroupScan( selection.getFileStatusList(fs), this);
+ return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot);
}
@Override
@@ -170,15 +164,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
return true;
} else {
- PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter() {
- @Override
- public boolean accept(Path path) {
- if (path.toString().contains("_metadata")) {
- return false;
- }
- return super.accept(path);
- }
- };
+ PathFilter filter = new DrillPathFilter();
FileStatus[] files = fs.getUnderlying().listStatus(dir.getPath(), filter);
if (files.length == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index bcee2be..cd7575d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
import org.apache.drill.common.expression.FieldReference;
@@ -38,7 +37,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -89,6 +87,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
private final ParquetFormatConfig formatConfig;
private final FileSystem fs;
private List<EndpointAffinity> endpointAffinities;
+ private String selectionRoot;
private List<SchemaPath> columns;
@@ -112,7 +111,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JacksonInject StoragePluginRegistry engineRegistry, //
- @JsonProperty("columns") List<SchemaPath> columns //
+ @JsonProperty("columns") List<SchemaPath> columns, //
+ @JsonProperty("selectionRoot") String selectionRoot //
) throws IOException, ExecutionSetupException {
this.columns = columns;
if(formatConfig == null) formatConfig = new ParquetFormatConfig();
@@ -123,12 +123,18 @@ public class ParquetGroupScan extends AbstractGroupScan {
this.fs = formatPlugin.getFileSystem().getUnderlying();
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
+ this.selectionRoot = selectionRoot;
this.readFooterFromEntries();
}
+ public String getSelectionRoot() {
+ return selectionRoot;
+ }
+
public ParquetGroupScan(List<FileStatus> files, //
- ParquetFormatPlugin formatPlugin) //
+ ParquetFormatPlugin formatPlugin, //
+ String selectionRoot) //
throws IOException {
this.formatPlugin = formatPlugin;
this.columns = null;
@@ -140,6 +146,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
entries.add(new ReadEntryWithPath(file.getPath().toString()));
}
+ this.selectionRoot = selectionRoot;
+
readFooter(files);
}
@@ -202,6 +210,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
private EndpointByteMap byteMap;
private int rowGroupIndex;
+ private String root;
@JsonCreator
public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
@@ -282,7 +291,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns);
+ return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 0b1a788..dd5c91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -34,7 +33,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -53,6 +51,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
private final ParquetFormatPlugin formatPlugin;
private final List<RowGroupReadEntry> rowGroupReadEntries;
private final List<SchemaPath> columns;
+ private String selectionRoot;
@JsonCreator
public ParquetRowGroupScan( //
@@ -60,7 +59,8 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
- @JsonProperty("columns") List<SchemaPath> columns //
+ @JsonProperty("columns") List<SchemaPath> columns, //
+ @JsonProperty("selectionRoot") String selectionRoot //
) throws ExecutionSetupException {
if(formatConfig == null) formatConfig = new ParquetFormatConfig();
@@ -71,16 +71,19 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
this.rowGroupReadEntries = rowGroupReadEntries;
this.formatConfig = formatPlugin.getConfig();
this.columns = columns;
+ this.selectionRoot = selectionRoot;
}
public ParquetRowGroupScan( //
ParquetFormatPlugin formatPlugin, //
List<RowGroupReadEntry> rowGroupReadEntries, //
- List<SchemaPath> columns) {
+ List<SchemaPath> columns,
+ String selectionRoot) {
this.formatPlugin = formatPlugin;
this.formatConfig = formatPlugin.getConfig();
this.rowGroupReadEntries = rowGroupReadEntries;
this.columns = columns;
+ this.selectionRoot = selectionRoot;
}
@JsonProperty("entries")
@@ -93,6 +96,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
return formatPlugin.getStorageConfig();
}
+ public String getSelectionRoot() {
+ return selectionRoot;
+ }
+
@Override
public OperatorCost getCost() {
return null;
@@ -121,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns);
+ return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns, selectionRoot);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d36dbc0..6278a79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,33 +18,29 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.mock.MockScanBatchCreator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -54,12 +50,34 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
@Override
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
+ String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ List<SchemaPath> columns = rowGroupScan.getColumns();
+
List<RecordReader> readers = Lists.newArrayList();
-
+
+ List<String[]> partitionColumns = Lists.newArrayList();
+ List<Integer> selectedPartitionColumns = Lists.newArrayList();
+ boolean selectAllColumns = false;
+
+ if (columns == null || columns.size() == 0) {
+ selectAllColumns = true;
+ } else {
+ Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+ for (SchemaPath column : columns) {
+ Matcher m = pattern.matcher(column.getAsUnescapedPath());
+ if (m.matches()) {
+ columns.remove(column);
+ selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+ }
+ }
+ }
+
+
FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
// keep footers in a map to avoid re-reading them
Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
+ int numParts = 0;
for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
/*
Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
@@ -81,10 +99,30 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
rowGroupScan.getColumns()
)
);
+ if (rowGroupScan.getSelectionRoot() != null) {
+ String[] r = rowGroupScan.getSelectionRoot().split("/");
+ String[] p = e.getPath().split("/");
+ if (p.length > r.length) {
+ String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+ partitionColumns.add(q);
+ numParts = Math.max(numParts, q.length);
+ } else {
+ partitionColumns.add(new String[] {});
+ }
+ } else {
+ partitionColumns.add(new String[] {});
+ }
} catch (IOException e1) {
throw new ExecutionSetupException(e1);
}
}
- return new ScanBatch(context, readers.iterator());
+
+ if (selectAllColumns) {
+ for (int i = 0; i < numParts; i++) {
+ selectedPartitionColumns.add(i);
+ }
+ }
+
+ return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f3b04b5..2f145a7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -41,7 +41,8 @@ drill.exec: {
text: {
buffer.size: 262144,
batch.size: 4000
- }
+ },
+ partition.column.label: "dir"
}
},
metrics : {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 191115b..e0e874b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill;
+import org.apache.drill.common.util.FileUtils;
import org.junit.Test;
public class TestExampleQueries extends BaseTestQuery{
@@ -43,6 +44,20 @@ public class TestExampleQueries extends BaseTestQuery{
}
@Test
+ public void testText() throws Exception {
+ String root = FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString();
+ String query = String.format("select * from dfs.`%s`", root);
+ test(query);
+ }
+
+ @Test
+ public void testTextPartitions() throws Exception {
+ String root = FileUtils.getResourceAsFile("/store/text/data/").toURI().toString();
+ String query = String.format("select * from dfs.`%s`", root);
+ test(query);
+ }
+
+ @Test
public void testJoin() throws Exception{
test("SELECT\n" +
" nations.N_NAME,\n" +
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
index 5fbcc8b..0155690 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -71,7 +71,7 @@ public class TextRecordReaderTest extends PopUnitTestBase {
List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
Files.toString(
FileUtils.getResourceAsFile("/store/text/test.json"), Charsets.UTF_8)
- .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/regions.csv").toURI().toString()));
+ .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString()));
int count = 0;
RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
for(QueryResultBatch b : results) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/storage-engines.json b/exec/java-exec/src/test/resources/storage-engines.json
deleted file mode 100644
index 73899ee..0000000
--- a/exec/java-exec/src/test/resources/storage-engines.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "storage":{
- dfs: {
- type: "file",
- connection: "file:///"
- },
- cp: {
- type: "file",
- connection: "classpath:///"
- }
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv b/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
new file mode 100644
index 0000000..e97d2ed
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/data/d1/regions.csv
@@ -0,0 +1,5 @@
+0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
+1,AMERICA,hs use ironic, even requests. s,
+2,ASIA,ges. thinly even pinto beans ca,
+3,EUROPE,ly final courts cajole furiously final excuse,
+4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/data/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/regions.csv b/exec/java-exec/src/test/resources/store/text/data/regions.csv
new file mode 100644
index 0000000..e97d2ed
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/data/regions.csv
@@ -0,0 +1,5 @@
+0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
+1,AMERICA,hs use ironic, even requests. s,
+2,ASIA,ges. thinly even pinto beans ca,
+3,EUROPE,ly final courts cajole furiously final excuse,
+4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/exec/java-exec/src/test/resources/store/text/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/regions.csv b/exec/java-exec/src/test/resources/store/text/regions.csv
deleted file mode 100644
index e97d2ed..0000000
--- a/exec/java-exec/src/test/resources/store/text/regions.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
-1,AMERICA,hs use ironic, even requests. s,
-2,ASIA,ges. thinly even pinto beans ca,
-3,EUROPE,ly final courts cajole furiously final excuse,
-4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d1123d..7166ee4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,7 @@
<forkCount>8</forkCount>
<reuseForks>true</reuseForks>
<additionalClasspathElements>
- <additionalClasspathElement>./sqlparser/src/test/resources/storage-engines.json</additionalClasspathElement>
+ <additionalClasspathElement>./sqlparser/src/test/resources/storage-plugins.json</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69c571cc/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index b454b52..ef1674f 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -26,6 +26,7 @@ import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
+import org.apache.drill.common.util.FileUtils;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
import org.apache.drill.jdbc.Driver;