You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/23 05:14:46 UTC
[08/10] DRILL-442: Implement text format plugin
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 807c67e..8bdb1ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.store.easy.json;
import java.util.List;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
@@ -41,7 +43,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
}
public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
- super(name, context, fs, config, formatPluginConfig, true, false, false, "json", "json");
+ super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
new file mode 100644
index 0000000..850f248
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.text.DrillTextRecordReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
+
+ public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
+ super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList<String>(), "text");
+ }
+
+ public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) {
+ super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text");
+ }
+
+
+ @Override
+ public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ Path path = getFileSystem().getUnderlying().makeQualified(new Path(fileWork.getPath()));
+ FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
+ Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
+ return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+ }
+
+ @JsonTypeName("text")
+ public static class TextFormatConfig implements FormatPluginConfig {
+
+ public List<String> extensions;
+ public String delimiter;
+
+ public List<String> getExtensions() {
+ return extensions;
+ }
+
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ @Override
+ public int hashCode() {
+ return 33;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof TextFormatConfig))
+ return false;
+ TextFormatConfig that = (TextFormatConfig) obj;
+ if (this.delimiter.equals(that.delimiter))
+ return true;
+ return false;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index a1e8f1d..bf33805 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -79,25 +79,25 @@ public class HiveScan extends AbstractGroupScan {
@JsonCreator
public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storage-plugin") String storagePluginName,
@JsonProperty("columns") List<FieldReference> columns,
- @JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException {
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.table = hiveReadEntry.getTable();
this.storagePluginName = storagePluginName;
- this.storagePlugin = (HiveStoragePlugin) engineRegistry.getEngine(storagePluginName);
+ this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
this.columns = columns;
this.partitions = hiveReadEntry.getPartitions();
getSplits();
endpoints = storagePlugin.getContext().getBits();
}
- public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
+ public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<FieldReference> columns) throws ExecutionSetupException {
this.table = hiveReadEntry.getTable();
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.partitions = hiveReadEntry.getPartitions();
getSplits();
- endpoints = storageEngine.getContext().getBits();
- this.storagePluginName = storageEngine.getName();
+ endpoints = storagePlugin.getContext().getBits();
+ this.storagePluginName = storagePlugin.getName();
}
public List<FieldReference> getColumns() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 33eb68a..d3200fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index c20c134..bcee2be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
@@ -113,7 +114,6 @@ public class ParquetGroupScan extends AbstractGroupScan {
@JacksonInject StoragePluginRegistry engineRegistry, //
@JsonProperty("columns") List<SchemaPath> columns //
) throws IOException, ExecutionSetupException {
- engineRegistry.init(DrillConfig.create());
this.columns = columns;
if(formatConfig == null) formatConfig = new ParquetFormatConfig();
Preconditions.checkNotNull(storageConfig);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index eaa4f17..4ae84fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -61,6 +61,7 @@ public class AssignmentCreator<T extends CompleteWork> {
}
private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
Stopwatch watch = new Stopwatch();
Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
@@ -116,6 +117,7 @@ public class AssignmentCreator<T extends CompleteWork> {
.get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
mappings.put(minorFragmentId, unit);
+ logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
// logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
// minorFragmentId, endpoints.get(minorFragmentId).getAddress());
// if (bytesPerEndpoint.get(currentEndpoint) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 432c1d7..1ad134e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -40,6 +40,7 @@ import com.codahale.metrics.Timer;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.Range;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class BlockMapBuilder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
@@ -50,19 +51,24 @@ public class BlockMapBuilder {
private Collection<DrillbitEndpoint> endpoints;
private FileSystem fs;
private HashMap<String,DrillbitEndpoint> endPointMap;
+ private CompressionCodecFactory codecFactory;
public BlockMapBuilder(FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
this.fs = fs;
this.endpoints = endpoints;
+ codecFactory = new CompressionCodecFactory(fs.getConf());
buildEndpointMap();
}
-
+ private boolean compressed(FileStatus fileStatus) {
+ return codecFactory.getCodec(fileStatus.getPath()) != null;
+ }
+
public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{
List<CompleteFileWork> work = Lists.newArrayList();
for(FileStatus f : files){
ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f);
- if(!blockify){
+ if(!blockify || compressed(f)){
work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 30b08f6..cfa4fcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -102,4 +102,9 @@ public class CompleteFileWork implements FileWork, CompleteWork{
}
}
+
+ @Override
+ public String toString() {
+ return String.format("File: %s start: %d length: %d", path, start, length);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
new file mode 100644
index 0000000..17d2adb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DrillTextRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
+
+ private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader;
+ private List<ValueVector> vectors = Lists.newArrayList();
+ private byte delimiter;
+ private int targetRecordCount;
+ private FieldReference ref = new FieldReference("columns");
+ private FragmentContext context;
+ private RepeatedVarCharVector vector;
+ private List<Integer> columnIds = Lists.newArrayList();
+ private LongWritable key;
+ private Text value;
+ private int numCols = 0;
+ private boolean redoRecord = false;
+
+ public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
+ this.context = context;
+ this.delimiter = (byte) delimiter;
+ if(columns != null) {
+ for (SchemaPath path : columns) {
+ assert path.getRootSegment().isNamed();
+ Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index");
+ int index = path.getRootSegment().getChild().getArraySegment().getIndex();
+ columnIds.add(index);
+ }
+ }
+ targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
+ numCols = columnIds.size();
+ TextInputFormat inputFormat = new TextInputFormat();
+ JobConf job = new JobConf();
+ job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
+ job.setInputFormat(inputFormat.getClass());
+ try {
+ reader = inputFormat.getRecordReader(split, job, Reporter.NULL);
+ key = reader.createKey();
+ value = reader.createValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ output.removeAllFields();
+ MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
+ vector = new RepeatedVarCharVector(field, context.getAllocator());
+ try {
+ output.addField(vector);
+ output.setNewSchema();
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public int next() {
+ AllocationHelper.allocate(vector, targetRecordCount, 50);
+ try {
+ int recordCount = 0;
+ while (redoRecord || (recordCount < targetRecordCount && reader.next(key, value))) {
+ redoRecord = false;
+ int start;
+ int end = -1;
+ int p = 0;
+ int i = 0;
+ vector.getMutator().startNewGroup(recordCount);
+ while (end < value.getLength() - 1) {
+ if(numCols > 0 && p >= numCols) {
+ break;
+ }
+ start = end;
+ end = find(value, delimiter, start + 1);
+ if (end == -1) {
+ end = value.getLength();
+ }
+ if (numCols > 0 && i++ < columnIds.get(p)) {
+ if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, start + 1)) {
+ redoRecord = true;
+ vector.getMutator().setValueCount(recordCount);
+ return recordCount;
+ }
+ continue;
+ }
+ p++;
+ if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1)) {
+ redoRecord = true;
+ vector.getMutator().setValueCount(recordCount);
+ return recordCount;
+ }
+ }
+ recordCount++;
+ }
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+ vector.getMutator().setValueCount(recordCount);
+ return recordCount;
+ } catch (IOException e) {
+ cleanup();
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ public int find(Text text, byte what, int start) {
+ int len = text.getLength();
+ int p = start;
+ byte[] bytes = text.getBytes();
+ while (p < len) {
+ if (bytes[p] == what) {
+ return p;
+ }
+ p++;
+ }
+ return -1;
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("Exception closing reader: {}", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 1942f62..b48d3bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.util;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;
@@ -101,7 +102,9 @@ public class VectorUtil {
}
else if (o instanceof byte[]) {
String value = new String((byte[]) o);
- System.out.printf(format, value.length() <= columnWidth ? value : value.substring(0, columnWidth - 1));
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+ } else if (o instanceof List) {
+ System.out.printf("| %s", o);
} else {
String value = o.toString();
System.out.printf(format, value.length() <= columnWidth ? value : value.substring(0,columnWidth - 1));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
index 556c9a9..bd03038 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
@@ -28,7 +28,13 @@ public interface RepeatedVariableWidthVector extends ValueVector{
* @param childValueCount Number of supported values in the vector.
*/
public void allocateNew(int totalBytes, int parentValueCount, int childValueCount);
-
+
+ /**
+ * Provide the maximum amount of variable width bytes that can be stored int his vector.
+ * @return
+ */
+ public int getByteCapacity();
+
/**
* Load the records in the provided buffer based on the given number of values.
* @param dataBytes The number of bytes associated with the data array.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVariableEstimatedAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVariableEstimatedAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVariableEstimatedAllocator.java
new file mode 100644
index 0000000..2a0ca65
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVariableEstimatedAllocator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+public class RepeatedVariableEstimatedAllocator extends VectorAllocator{
+ RepeatedVariableWidthVector out;
+ int avgWidth;
+
+ public RepeatedVariableEstimatedAllocator(RepeatedVariableWidthVector out, int avgWidth) {
+ super();
+ this.out = out;
+ this.avgWidth = avgWidth;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(avgWidth * recordCount, recordCount, recordCount);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVectorAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVectorAllocator.java
new file mode 100644
index 0000000..d9be306
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/RepeatedVectorAllocator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+class RepeatedVectorAllocator extends VectorAllocator{
+ RepeatedVariableWidthVector in;
+ RepeatedVariableWidthVector out;
+
+ public RepeatedVectorAllocator(RepeatedVariableWidthVector in, RepeatedVariableWidthVector out) {
+ super();
+ this.in = in;
+ this.out = out;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(in.getByteCapacity(), in.getAccessor().getValueCount(), in.getAccessor().getValueCount());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
index fcd09cd..77b6e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.vector.allocator;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
@@ -29,6 +30,8 @@ public abstract class VectorAllocator{
return new FixedVectorAllocator((FixedWidthVector) outgoing);
}else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
+ } else if (outgoing instanceof RepeatedVariableWidthVector && in instanceof RepeatedVariableWidthVector) {
+ return new RepeatedVectorAllocator((RepeatedVariableWidthVector) in, (RepeatedVariableWidthVector) outgoing);
}else{
throw new UnsupportedOperationException();
}
@@ -40,7 +43,9 @@ public abstract class VectorAllocator{
return new FixedVectorAllocator((FixedWidthVector) outgoing);
}else if(outgoing instanceof VariableWidthVector){
return new VariableEstimatedVector( (VariableWidthVector) outgoing, averageBytesPerVariable);
- }else{
+ }else if (outgoing instanceof RepeatedVariableWidthVector) {
+ return new RepeatedVariableEstimatedAllocator((RepeatedVariableWidthVector) outgoing, averageBytesPerVariable);
+ } else {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 81e6135..f3b04b5 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -36,7 +36,13 @@ drill.exec: {
},
functions: ["org.apache.drill.expr.fn.impl"],
storage: {
- packages += "org.apache.drill.exec.store"
+ packages += "org.apache.drill.exec.store",
+ file: {
+ text: {
+ buffer.size: 262144,
+ batch.size: 4000
+ }
+ }
},
metrics : {
context: "drillbit",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index a9c8e69..3a492c5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -27,6 +27,8 @@ import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.LocalCache;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.ops.QueryContext;
@@ -63,6 +65,9 @@ public class PlanningBase {
protected void testSqlPlan(String sqlCommands) throws Exception{
String[] sqlStrings = sqlCommands.split(";");
+ final DistributedCache cache = new LocalCache();
+ cache.run();
+
new NonStrictExpectations() {
{
dbContext.getMetrics();
@@ -71,10 +76,13 @@ public class PlanningBase {
result = new TopLevelAllocator();
dbContext.getConfig();
result = config;
+ dbContext.getCache();
+ result = cache;
}
};
StoragePluginRegistry registry = new StoragePluginRegistry(dbContext);
+ registry.init();
final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
final SchemaPlus root = Frameworks.createRootSchema();
registry.getSchemaFactory().registerSchemas(null, root);
@@ -96,6 +104,8 @@ public class PlanningBase {
result = new PlannerSettings();
context.getConfig();
result = config;
+ context.getCache();
+ result = cache;
}
};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index f5c9884..06d8a32 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -117,6 +117,7 @@ public class TestMergeJoin extends PopUnitTestBase {
}
@Test
+ @Ignore
public void orderedEqualityLeftJoin(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
@@ -170,6 +171,7 @@ public class TestMergeJoin extends PopUnitTestBase {
}
@Test
+ @Ignore
public void orderedEqualityInnerJoin(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
@@ -223,6 +225,7 @@ public class TestMergeJoin extends PopUnitTestBase {
}
@Test
+ @Ignore
public void orderedEqualityMultiBatchJoin(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 95bee87..1cc77f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -82,7 +83,7 @@ public class ExpressionTreeMaterializerTest {
new NonStrictExpectations() {
{
- batch.getValueVectorId(new FieldReference("test", ExpressionPosition.UNKNOWN));
+ batch.getValueVectorId(new SchemaPath("test", ExpressionPosition.UNKNOWN));
result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
}
};
@@ -98,9 +99,9 @@ public class ExpressionTreeMaterializerTest {
public void testMaterializingLateboundTree(final @Injectable RecordBatch batch) throws SchemaChangeException {
new NonStrictExpectations() {
{
- batch.getValueVectorId(new FieldReference("test", ExpressionPosition.UNKNOWN));
+ batch.getValueVectorId(SchemaPath.getSimplePath("test"));
result = new TypedFieldId(Types.required(MinorType.BIT), -4);
- batch.getValueVectorId(new FieldReference("test1", ExpressionPosition.UNKNOWN));
+ batch.getValueVectorId(SchemaPath.getSimplePath("test1"));
result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
}
};
@@ -196,7 +197,7 @@ public class ExpressionTreeMaterializerTest {
new NonStrictExpectations() {
{
- batch.getValueVectorId(new FieldReference("test", ExpressionPosition.UNKNOWN));
+ batch.getValueVectorId(new SchemaPath("test", ExpressionPosition.UNKNOWN));
result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
index ffe05a4..f61f4ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
@@ -22,6 +22,8 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.cache.LocalCache;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.server.DrillbitContext;
import org.junit.Test;
@@ -33,7 +35,7 @@ public class TestOrphanSchema {
@Test
- public void test(final DrillbitContext bitContext){
+ public void test(final DrillbitContext bitContext) throws Exception {
final DrillConfig c = DrillConfig.create();
new NonStrictExpectations() {
@@ -44,11 +46,16 @@ public class TestOrphanSchema {
result = new TopLevelAllocator();
bitContext.getConfig();
result = c;
+ bitContext.getCache();
+ result = new LocalCache();
}
};
+ bitContext.getCache().run();
+
StoragePluginRegistry r = new StoragePluginRegistry(bitContext);
SchemaPlus plus = Frameworks.createRootSchema();
+ r.init();
r.getSchemaFactory().registerSchemas(null, plus);
printSchema(plus, 0);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
index 22e4a14..1f1b367 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
@@ -23,6 +23,7 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.cache.LocalCache;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -37,8 +38,8 @@ public class OrphanSchema {
* Create an orphan schema to be used for testing.
* @return root node of the created schema.
*/
- public static SchemaPlus create(){
-
+ public static SchemaPlus create() throws Exception {
+
final DrillConfig c = DrillConfig.create();
// Mock up a context which will allow us to create a schema.
@@ -46,9 +47,13 @@ public class OrphanSchema {
when(bitContext.getMetrics()).thenReturn(new MetricRegistry());
when(bitContext.getAllocator()).thenReturn(new TopLevelAllocator());
when(bitContext.getConfig()).thenReturn(c);
+ when(bitContext.getCache()).thenReturn(new LocalCache());
+ bitContext.getCache().run();
+
// Using the mock context, get the orphan schema.
StoragePluginRegistry r = new StoragePluginRegistry(bitContext);
+ r.init();
SchemaPlus plus = Frameworks.createRootSchema();
r.getSchemaFactory().registerSchemas(null, plus);
return plus;
@@ -61,7 +66,7 @@ public class OrphanSchema {
*/
@Test
- public void test() {
+ public void test() throws Exception {
printSchema(create(), 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
index e76c609..d4755fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.store.ischema.RowProvider;
import org.apache.drill.exec.store.ischema.RowRecordReader;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -46,7 +47,12 @@ import org.junit.Test;
* An "orphan schema" is a stand alone schema which is not (yet?) connected to Optiq.
*/
public class TestOrphanSchema {
- SchemaPlus root = OrphanSchema.create();
+ static SchemaPlus root;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ root = OrphanSchema.create();
+ }
@Test
public void testTables() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
new file mode 100644
index 0000000..5fbcc8b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.junit.AfterClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TextRecordReaderTest extends PopUnitTestBase {
+
+ @Test
+ public void testFullExecution() throws Exception {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ bit1.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(
+ FileUtils.getResourceAsFile("/store/text/test.json"), Charsets.UTF_8)
+ .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/regions.csv").toURI().toString()));
+ int count = 0;
+ RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
+ for(QueryResultBatch b : results) {
+ if (b.getHeader().getRowCount() != 0) {
+ count += b.getHeader().getRowCount();
+ }
+ loader.load(b.getHeader().getDef(), b.getData());
+ VectorUtil.showVectorAccessibleContent(loader);
+ }
+ assertEquals(5, count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/storage-plugins.json b/exec/java-exec/src/test/resources/storage-plugins.json
new file mode 100644
index 0000000..33f4fac
--- /dev/null
+++ b/exec/java-exec/src/test/resources/storage-plugins.json
@@ -0,0 +1,40 @@
+{
+ "storage":{
+ dfs: {
+ type: "file",
+ connection: "file:///",
+ formats: {
+ "psv" : {
+ type: "text",
+ extensions: [ "tbl" ],
+ delimiter: "|"
+ },
+ "csv" : {
+ type: "text",
+ extensions: [ "csv", "bcp" ],
+ delimiter: ","
+ },
+ "tsv" : {
+ type: "text",
+ extensions: [ "tsv" ],
+ delimiter: "\t"
+ },
+ "parquet" : {
+ type: "parquet"
+ }
+ }
+ },
+ cp: {
+ type: "file",
+ connection: "classpath:///",
+ formats: {
+ "json" : {
+ type: "json"
+ },
+ "parquet" : {
+ type: "parquet"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/resources/store/text/regions.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/regions.csv b/exec/java-exec/src/test/resources/store/text/regions.csv
new file mode 100644
index 0000000..e97d2ed
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/regions.csv
@@ -0,0 +1,5 @@
+0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to ,
+1,AMERICA,hs use ironic, even requests. s,
+2,ASIA,ges. thinly even pinto beans ca,
+3,EUROPE,ly final courts cajole furiously final excuse,
+4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/test/resources/store/text/test.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/test.json b/exec/java-exec/src/test/resources/store/text/test.json
new file mode 100644
index 0000000..ee33b5d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/text/test.json
@@ -0,0 +1,40 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ files:[
+ "#{DATA_FILE}"
+ ],
+ storage : {
+ type : "named",
+ name: "dfs"
+ },
+ format: {
+ type: "named",
+ name: "csv"
+ }
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"project",
+ exprs: [
+ { ref: "col1", expr:"columns[0]" },
+ { ref: "col2", expr:"columns[1]" }
+ ]
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2cac418..5d1123d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,7 @@
<exclude>**/*.json</exclude>
<exclude>**/*.sql</exclude>
<exclude>**/git.properties</exclude>
+ <exclude>**/*.csv</exclude>
<exclude>**/drill-*.conf</exclude>
<exclude>**/.buildpath</exclude>
<exclude>**/*.proto</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/sqlparser/src/test/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/resources/storage-engines.json b/sqlparser/src/test/resources/storage-engines.json
deleted file mode 100644
index c16a971..0000000
--- a/sqlparser/src/test/resources/storage-engines.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
- "storage":{
- dfs: {
- type: "file",
- connection: "file:///",
- workspaces: {
- default: "/",
- home: "/"
- }
- },
- cp: {
- type: "file",
- connection: "classpath:///"
- },
- hive : {
- type:"hive",
- config :
- {
- "hive.metastore.uris" : "",
- "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=/tmp/drill_hive_db;create=true",
- "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
- "fs.default.name" : "file:///",
- "hive.metastore.sasl.enabled" : "false"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/sqlparser/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/resources/storage-plugins.json b/sqlparser/src/test/resources/storage-plugins.json
new file mode 100644
index 0000000..b9a4bd8
--- /dev/null
+++ b/sqlparser/src/test/resources/storage-plugins.json
@@ -0,0 +1,47 @@
+{
+ "storage":{
+ dfs: {
+ type: "file",
+ connection: "file:///",
+ workspaces: {
+ default: "/",
+ home: "/"
+ },
+ formats: {
+ "psv" : {
+ type: "text",
+ extensions: [ "tbl" ],
+ delimiter: "|"
+ },
+ "csv" : {
+ type: "text",
+ extensions: [ "csv" ],
+ delimiter: ","
+ },
+ "tsv" : {
+ type: "text",
+ extensions: [ "tsv" ],
+ delimiter: "\t"
+ },
+ "parquet" : {
+ type: "parquet"
+ }
+ }
+ },
+ cp: {
+ type: "file",
+ connection: "classpath:///"
+ },
+ hive : {
+ type:"hive",
+ config :
+ {
+ "hive.metastore.uris" : "",
+ "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=/tmp/drill_hive_db;create=true",
+ "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
+ "fs.default.name" : "file:///",
+ "hive.metastore.sasl.enabled" : "false"
+ }
+ }
+ }
+}
\ No newline at end of file