You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/23 05:14:47 UTC
[09/10] git commit: DRILL-442: Implement text format plugin
DRILL-442: Implement text format plugin
rename storage-engines.json storage-plugins.json
allow reading a particular value in a repeated vector
fix test caused by change that allows selecting element of repeated record
set def recordCount for explain query
fix bug loading repeated vectors
storage plugin/format plugin changes. store storage plugin configuration in distributed cache.
add repeated vector allocators
add support for for reading compressed files.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/54287d07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/54287d07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/54287d07
Branch: refs/heads/master
Commit: 54287d0761f97f337035aa8988faf380178aba08
Parents: 7b6c7a1
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Mar 16 18:56:50 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 22 20:05:15 2014 -0700
----------------------------------------------------------------------
.../drill/common/util/DataInputInputStream.java | 15 +-
.../drill/exec/store/hbase/HBaseGroupScan.java | 9 +-
.../drill/exec/store/hbase/HBaseSubScan.java | 2 +-
...base_scan_screen_physical_column_select.json | 2 +-
distribution/src/assemble/bin.xml | 2 +-
distribution/src/resources/drill-override.conf | 36 +++-
distribution/src/resources/storage-engines.json | 26 ---
distribution/src/resources/storage-plugins.json | 49 ++++++
.../codegen/templates/RepeatedValueVectors.java | 60 ++++++-
.../templates/VariableLengthVectors.java | 4 +-
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../drill/exec/cache/DrillSerializable.java | 8 +-
.../exec/cache/JacksonDrillSerializable.java | 86 ++++++++++
.../org/apache/drill/exec/cache/LocalCache.java | 18 +-
.../cache/VectorAccessibleSerializable.java | 6 +-
.../drill/exec/expr/EvaluationVisitor.java | 8 +-
.../exec/expr/ExpressionTreeMaterializer.java | 31 +++-
.../exec/expr/ValueVectorReadExpression.java | 28 ++-
.../drill/exec/expr/fn/impl/Alternator.java | 4 +
.../apache/drill/exec/opt/BasicOptimizer.java | 6 +-
.../impl/project/ProjectRecordBatch.java | 8 +-
.../impl/svremover/RemovingRecordBatch.java | 8 +-
.../drill/exec/planner/PhysicalPlanReader.java | 4 +-
.../drill/exec/planner/logical/DrillOptiq.java | 2 +-
.../exec/planner/logical/StorageEngines.java | 74 --------
.../exec/planner/logical/StoragePlugins.java | 93 ++++++++++
.../org/apache/drill/exec/server/Drillbit.java | 1 +
.../drill/exec/store/StoragePluginRegistry.java | 119 ++++++++-----
.../exec/store/dfs/BasicFormatMatcher.java | 44 ++++-
.../drill/exec/store/dfs/FileSystemConfig.java | 13 +-
.../drill/exec/store/dfs/FileSystemPlugin.java | 2 +
.../drill/exec/store/dfs/FormatCreator.java | 5 +-
.../exec/store/dfs/NamedFormatPluginConfig.java | 3 +-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 24 ++-
.../exec/store/dfs/easy/EasyGroupScan.java | 36 ++--
.../drill/exec/store/dfs/easy/EasySubScan.java | 12 +-
.../exec/store/easy/json/JSONFormatPlugin.java | 4 +-
.../exec/store/easy/text/TextFormatPlugin.java | 98 +++++++++++
.../apache/drill/exec/store/hive/HiveScan.java | 10 +-
.../exec/store/parquet/ParquetFormatConfig.java | 1 +
.../exec/store/parquet/ParquetGroupScan.java | 2 +-
.../exec/store/schedule/AssignmentCreator.java | 2 +
.../exec/store/schedule/BlockMapBuilder.java | 10 +-
.../exec/store/schedule/CompleteFileWork.java | 5 +
.../exec/store/text/DrillTextRecordReader.java | 169 +++++++++++++++++++
.../org/apache/drill/exec/util/VectorUtil.java | 5 +-
.../vector/RepeatedVariableWidthVector.java | 8 +-
.../RepeatedVariableEstimatedAllocator.java | 36 ++++
.../allocator/RepeatedVectorAllocator.java | 36 ++++
.../exec/vector/allocator/VectorAllocator.java | 7 +-
.../src/main/resources/drill-module.conf | 8 +-
.../java/org/apache/drill/PlanningBase.java | 10 ++
.../exec/physical/impl/join/TestMergeJoin.java | 3 +
.../record/ExpressionTreeMaterializerTest.java | 9 +-
.../drill/exec/store/TestOrphanSchema.java | 9 +-
.../drill/exec/store/ischema/OrphanSchema.java | 11 +-
.../exec/store/ischema/TestOrphanSchema.java | 8 +-
.../exec/store/text/TextRecordReaderTest.java | 88 ++++++++++
.../src/test/resources/storage-plugins.json | 40 +++++
.../src/test/resources/store/text/regions.csv | 5 +
.../src/test/resources/store/text/test.json | 40 +++++
pom.xml | 1 +
.../src/test/resources/storage-engines.json | 27 ---
.../src/test/resources/storage-plugins.json | 47 ++++++
64 files changed, 1280 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java b/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
index c4c2282..f61b301 100644
--- a/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
+++ b/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.common.util;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
import java.io.*;
@@ -62,7 +65,17 @@ public class DataInputInputStream extends InputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
- in.readFully(b, off, len);
+ for (int i = off; i < off + len; i++) {
+ try {
+ b[i] = in.readByte();
+ } catch(Exception e) {
+ if (ExceptionUtils.getRootCause(e) instanceof EOFException) {
+ return i - off;
+ } else {
+ throw e;
+ }
+ }
+ }
return len;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index bb0adcc..b8b6af4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -76,14 +76,13 @@ public class HBaseGroupScan extends AbstractGroupScan {
@JsonCreator
public HBaseGroupScan(@JsonProperty("entries") List<HTableReadEntry> entries,
- @JsonProperty("storage") HBaseStoragePluginConfig storageEngineConfig,
+ @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry engineRegistry
+ @JacksonInject StoragePluginRegistry pluginRegistry
)throws IOException, ExecutionSetupException {
Preconditions.checkArgument(entries.size() == 1);
- engineRegistry.init(DrillConfig.create());
- this.storagePlugin = (HBaseStoragePlugin) engineRegistry.getEngine(storageEngineConfig);
- this.storagePluginConfig = storageEngineConfig;
+ this.storagePlugin = (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig);
+ this.storagePluginConfig = storagePluginConfig;
this.tableName = entries.get(0).getTableName();
this.columns = columns;
getRegionInfos();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 0e8a934..81a8af5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -57,7 +57,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("rowGroupReadEntries") LinkedList<HBaseSubScanReadEntry> rowGroupReadEntries,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
- hbaseStoragePlugin = (HBaseStoragePlugin) registry.getEngine(storage);
+ hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.rowGroupReadEntries = rowGroupReadEntries;
this.storage = storage;
this.columns = columns;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c3d356d..7940c65 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -20,7 +20,7 @@
"zookeeperPort" : 2181
},
columns: [
- "f2.c1", "f2.c2", "row_key"
+ "`f2`.c1", "`f2`.c2", "row_key"
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 1579832..b981355 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -137,7 +137,7 @@
<outputDirectory>conf</outputDirectory>
</file>
<file>
- <source>src/resources/storage-engines.json</source>
+ <source>src/resources/storage-plugins.json</source>
<outputDirectory>conf</outputDirectory>
</file>
</files>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index e837e74..a5e5522 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -51,9 +51,15 @@ drill.exec: {
},
functions: ["org.apache.drill.expr.fn.impl"],
storage: {
- packages += "org.apache.drill.exec.store"
+ packages += "org.apache.drill.exec.store",
+ file: {
+ text: {
+ buffer.size: 262144,
+ batch.size: 4000
+ }
+ }
},
- metrics : {
+ metrics : {
context: "drillbit",
jmx: {
enabled : true
@@ -71,7 +77,7 @@ drill.exec: {
retry: {
count: 7200,
delay: 500
- }
+ }
},
functions: ["org.apache.drill.expr.fn.impl"],
network: {
@@ -90,10 +96,26 @@ drill.exec: {
directories: ["/tmp/drill"],
filesystem: "drill-local:///"
},
- spooling: {
+ buffer:{
impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
- delete: false,
- size: 100000000
+ size: "20000",
+ spooling: {
+ delete: false,
+ size: 100000000
+ }
},
- sort.purge.threshold : 100
+ cache.hazel.subnets: ["*.*.*.*"],
+ sort: {
+ purge.threshold : 100,
+ external: {
+ batch.size : 4000,
+ spill: {
+ batch.size : 4000,
+ group.size : 100,
+ threshold : 200,
+ directories : [ "/tmp/drill/spill" ],
+ fs : "file:///"
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-engines.json b/distribution/src/resources/storage-engines.json
deleted file mode 100644
index 8b22858..0000000
--- a/distribution/src/resources/storage-engines.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "storage":{
- dfs: {
- type: "file",
- connection: "file:///"
- },
- cp: {
- type: "file",
- connection: "classpath:///"
- }
-
- /*,
- hive : {
- type:"hive",
- config :
- {
- "hive.metastore.uris" : "",
- "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
- "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
- "fs.default.name" : "file:///",
- "hive.metastore.sasl.enabled" : "false"
- }
- }
- */
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
new file mode 100644
index 0000000..6f2c015
--- /dev/null
+++ b/distribution/src/resources/storage-plugins.json
@@ -0,0 +1,49 @@
+{
+ "storage":{
+ dfs: {
+ type: "file",
+ connection: "file:///",
+ formats: {
+ "psv" : {
+ type: "text",
+ extensions: [ "tbl" ],
+ delimiter: "|"
+ },
+ "csv" : {
+ type: "text",
+ extensions: [ "csv" ],
+ delimiter: ","
+ },
+ "tsv" : {
+ type: "text",
+ extensions: [ "tsv" ],
+ delimiter: "\t"
+ },
+ "parquet" : {
+ type: "parquet"
+ },
+ "json" : {
+ type: "json"
+ }
+ }
+ },
+ cp: {
+ type: "file",
+ connection: "classpath:///"
+ }
+
+ /*,
+ hive : {
+ type:"hive",
+ config :
+ {
+ "hive.metastore.uris" : "",
+ "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
+ "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
+ "fs.default.name" : "file:///",
+ "hive.metastore.sasl.enabled" : "false"
+ }
+ }
+ */
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 8d5e90a..35bd480 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -108,15 +108,37 @@ package org.apache.drill.exec.vector;
to.copyFrom(fromIndex, toIndex, Repeated${minor.class}Vector.this);
}
}
-
- public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
- throw new UnsupportedOperationException();
- }
-
- public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
- throw new UnsupportedOperationException();
- }
-
+
+<#if type.major == "VarLen">
+ public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+ int count = v.getAccessor().getCount(inIndex);
+ getMutator().startNewGroup(outIndex);
+ for (int i = 0; i < count; i++) {
+ getMutator().add(outIndex, v.getAccessor().get(inIndex, i));
+ }
+ }
+
+ public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+ int count = v.getAccessor().getCount(inIndex);
+ getMutator().startNewGroup(outIndex);
+ for (int i = 0; i < count; i++) {
+ if (!getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+<#else>
+
+ public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+ throw new UnsupportedOperationException();
+ }
+</#if>
+
<#if type.major == "VarLen">
@Override
public FieldMetadata getMetadata() {
@@ -131,6 +153,7 @@ package org.apache.drill.exec.vector;
public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
offsets.allocateNew(parentValueCount+1);
+ offsets.getMutator().set(0,0);
values.allocateNew(totalBytes, childValueCount);
mutator.reset();
accessor.reset();
@@ -261,6 +284,12 @@ package org.apache.drill.exec.vector;
holder.vector = values;
}
+ public void get(int index, int positionIndex, ${minor.class}Holder holder) {
+ int offset = offsets.getAccessor().get(index);
+ assert offset >= 0;
+ values.getAccessor().get(offset + positionIndex, holder);
+ }
+
public MaterializedField getField() {
return field;
}
@@ -301,6 +330,19 @@ package org.apache.drill.exec.vector;
offsets.getMutator().set(index+1, nextOffset+1);
}
+ <#if type.major == "VarLen">
+ public boolean addSafe(int index, byte[] bytes) {
+ return addSafe(index, bytes, 0, bytes.length);
+ }
+
+ public boolean addSafe(int index, byte[] bytes, int start, int length) {
+ int nextOffset = offsets.getAccessor().get(index+1);
+ boolean b1 = values.getMutator().setSafe(nextOffset, bytes, start, length);
+ boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
+ return (b1 && b2);
+ }
+ </#if>
+
public void add(int index, ${minor.class}Holder holder){
int nextOffset = offsets.getAccessor().get(index+1);
values.getMutator().set(nextOffset, holder);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index dcc7135..9cec943 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -323,7 +323,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
if (data.capacity() < currentOffset + length) return false;
- offsetVector.getMutator().set(index + 1, currentOffset + length);
+ if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + length)) {
+ return false;
+ }
data.setBytes(currentOffset, bytes, start, length);
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be59ea6..f88b1b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -58,5 +58,7 @@ public interface ExecConstants {
public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
+ public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+ public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 875e8b6..4f266f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.cache;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
import java.io.*;
/**
@@ -24,8 +28,8 @@ import java.io.*;
*/
public interface DrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
- public void read(DataInput input) throws IOException;
+ public void readData(ObjectDataInput input) throws IOException;
public void readFromStream(InputStream input) throws IOException;
- public void write(DataOutput output) throws IOException;
+ public void writeData(ObjectDataOutput output) throws IOException;
public void writeToStream(OutputStream output) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
new file mode 100644
index 0000000..a7b0be2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+public abstract class JacksonDrillSerializable<T> implements DrillSerializable, DataSerializable{
+ private ObjectMapper mapper;
+ private T obj;
+
+ public JacksonDrillSerializable(DrillbitContext context, T obj) {
+ this.mapper = context.getConfig().getMapper();
+ this.obj = obj;
+ }
+
+ public JacksonDrillSerializable() {
+ }
+
+ @Override
+ public void readData(ObjectDataInput input) throws IOException {
+ readFromStream(DataInputInputStream.constructInputStream(input));
+ }
+
+ public void readFromStream(InputStream input, Class clazz) throws IOException {
+ mapper = DrillConfig.create().getMapper();
+ obj = (T) mapper.readValue(input, clazz);
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput output) throws IOException {
+ writeToStream(DataOutputOutputStream.constructOutputStream(output));
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ output.write(mapper.writeValueAsBytes(obj));
+ }
+
+ public T getObj() {
+ return obj;
+ }
+
+ public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
+
+ public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
+ super(context, obj);
+ }
+
+ public StoragePluginsSerializable(BufferAllocator allocator) {
+ }
+
+ public StoragePluginsSerializable() {
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ readFromStream(input, StoragePlugins.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 38de688..119764b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,7 +17,10 @@
*/
package org.apache.drill.exec.cache;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
@@ -27,6 +30,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -109,8 +114,14 @@ public class LocalCache implements DistributedCache {
public static ByteArrayDataOutput serialize(DrillSerializable obj) {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
try {
- obj.write(out);
+ obj.writeToStream(outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ outputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -119,9 +130,10 @@ public class LocalCache implements DistributedCache {
public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+ InputStream inputStream = DataInputInputStream.constructInputStream(in);
try {
V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
- obj.read(in);
+ obj.readFromStream(inputStream);
return obj;
} catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
@@ -164,6 +176,8 @@ public class LocalCache implements DistributedCache {
@Override
public V get(String key) {
if (m.get(key) == null) return null;
+ ByteArrayDataOutput b = m.get(key);
+ byte[] bytes = b.toByteArray();
return (V) deserialize(m.get(key).toByteArray(), this.clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index ff7ab02..9511992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
import io.netty.buffer.ByteBuf;
import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
@@ -82,7 +84,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
@Override
- public void read(DataInput input) throws IOException {
+ public void readData(ObjectDataInput input) throws IOException {
readFromStream(DataInputInputStream.constructInputStream(input));
}
@@ -125,7 +127,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
@Override
- public void write(DataOutput output) throws IOException {
+ public void writeData(ObjectDataOutput output) throws IOException {
writeToStream(DataOutputOutputStream.constructOutputStream(output));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 2e632a3..fd547e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
+import com.google.common.collect.Lists;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -45,6 +46,7 @@ import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression;
import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -332,7 +334,11 @@ public class EvaluationVisitor {
}
} else {
if (Types.usesHolderForGet(e.getMajorType())) {
- generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
+ if (e.isArrayElement()) {
+ generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(JExpr.lit(e.getIndex())).arg(out.getHolder()));
+ } else {
+ generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
+ }
} else {
generator.getEvalBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index f9572db..e3b1002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.expr;
import java.util.List;
+import com.google.common.base.Joiner;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -26,6 +27,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
@@ -228,12 +230,37 @@ public class ExpressionTreeMaterializer {
@Override
public LogicalExpression visitSchemaPath(SchemaPath path, FunctionImplementationRegistry value) {
// logger.debug("Visiting schema path {}", path);
- TypedFieldId tfId = batch.getValueVectorId(path);
+ PathSegment seg = path.getRootSegment();
+ List<String> segments = Lists.newArrayList();
+ segments.add(seg.getNameSegment().getPath().toString());
+ boolean isArrayElement = false;
+ int index = -1;
+ while((seg = seg.getChild()) != null) {
+ if (seg.isNamed()) {
+ segments.add(seg.getNameSegment().getPath().toString());
+ if (seg.isLastPath()) {
+ break;
+ }
+ } else {
+ if (!seg.isLastPath()) {
+ throw new UnsupportedOperationException("Repeated map type not supported");
+ }
+ index = seg.getArraySegment().getIndex();
+ isArrayElement = true;
+ break;
+ }
+ }
+ SchemaPath newPath = SchemaPath.getCompoundPath((String[]) segments.toArray(new String[0]));
+ TypedFieldId tfId = batch.getValueVectorId(newPath);
if (tfId == null) {
logger.warn("Unable to find value vector of path {}, returning null instance.", path);
return NullExpression.INSTANCE;
} else {
- return new ValueVectorReadExpression(tfId);
+ ValueVectorReadExpression e = new ValueVectorReadExpression(tfId, index, isArrayElement);
+ if (isArrayElement) {
+ e.required();
+ }
+ return e;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index 5e251a1..4ba503d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -22,23 +22,43 @@ import java.util.Iterator;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.record.TypedFieldId;
import com.google.common.collect.Iterators;
+import javax.sound.sampled.FloatControl;
+
public class ValueVectorReadExpression implements LogicalExpression{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
- private final MajorType type;
+ private MajorType type;
private final TypedFieldId fieldId;
private final boolean superReader;
+ private final int index;
+ private final boolean isArrayElement;
- public ValueVectorReadExpression(TypedFieldId tfId){
+ public ValueVectorReadExpression(TypedFieldId tfId, int index, boolean isArrayElement){
this.type = tfId.getType();
this.fieldId = tfId;
this.superReader = tfId.isHyperReader();
+ this.index = index;
+ this.isArrayElement = isArrayElement;
+ }
+
+ public void required() {
+ type = Types.required(type.getMinorType());
+ }
+
+ public boolean isArrayElement() {
+ return isArrayElement;
+ }
+
+ public ValueVectorReadExpression(TypedFieldId tfId) {
+ this(tfId, -1, false);
}
public TypedFieldId getTypedFieldId(){
@@ -62,6 +82,10 @@ public class ValueVectorReadExpression implements LogicalExpression{
return fieldId;
}
+ public int getIndex() {
+ return index;
+ }
+
@Override
public ExpressionPosition getPosition() {
return ExpressionPosition.UNKNOWN;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
index bdd227c..641063b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
@@ -17,12 +17,16 @@
*/
package org.apache.drill.exec.expr.fn.impl;
+import org.apache.drill.exec.expr.DrillAggFunc;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.record.RecordBatch;
public class Alternator {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 97ec026..d7d5ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -184,10 +184,10 @@ public class BasicOptimizer extends Optimizer{
public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
if(config == null) throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
- StoragePlugin engine;
+ StoragePlugin storagePlugin;
try {
- engine = context.getStorage().getEngine(config);
- return engine.getPhysicalScan(scan.getSelection());
+ storagePlugin = context.getStorage().getPlugin(config);
+ return storagePlugin.getPhysicalScan(scan.getSelection());
} catch (IOException | ExecutionSetupException e) {
throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index aaee8e7..e8ee3cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -140,9 +141,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE &&
- !isAnyWildcard &&
- !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())) {
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ && !isAnyWildcard
+ &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())
+ && !((ValueVectorReadExpression) expr).isArrayElement()) {
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
Preconditions.checkNotNull(incoming);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1377881..499f4d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -28,8 +28,14 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index a100163..167a992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -48,7 +48,7 @@ public class PhysicalPlanReader {
private final ObjectReader logicalPlanReader;
public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint,
- final StoragePluginRegistry engineRegistry) {
+ final StoragePluginRegistry pluginRegistry) {
// Endpoint serializer/deserializer.
SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
@@ -61,7 +61,7 @@ public class PhysicalPlanReader {
mapper.registerModule(deserModule);
mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
InjectableValues injectables = new InjectableValues.Std() //
- .addValue(StoragePluginRegistry.class, engineRegistry) //
+ .addValue(StoragePluginRegistry.class, pluginRegistry) //
.addValue(DrillbitEndpoint.class, endpoint); //
this.mapper = mapper;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index bc2178b..46eed45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -157,7 +157,7 @@ public class DrillOptiq {
if (call.getOperator() == SqlStdOperatorTable.ITEM) {
SchemaPath left = (SchemaPath) call.getOperands().get(0).accept(this);
final RexLiteral literal = (RexLiteral) call.getOperands().get(1);
- return left.getChild((String) literal.getValue2());
+ return left.getChild(literal.getValue2().toString());
}
// fall through
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
deleted file mode 100644
index d298040..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
-
-public class StorageEngines implements Iterable<Map.Entry<String, StoragePluginConfig>>{
-
- private Map<String, StoragePluginConfig> storage;
-
- @JsonCreator
- public StorageEngines(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
- this.storage = storage;
- }
-
- public static void main(String[] args) throws Exception{
- DrillConfig config = DrillConfig.create();
- String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
- StorageEngines se = config.getMapper().readValue(data, StorageEngines.class);
- System.out.println(se);
- }
-
- @Override
- public String toString() {
- final int maxLen = 10;
- return "StorageEngines [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
- }
-
- @Override
- public Iterator<Entry<String, StoragePluginConfig>> iterator() {
- return storage.entrySet().iterator();
- }
-
- private String toString(Collection<?> collection, int maxLen) {
- StringBuilder builder = new StringBuilder();
- builder.append("[");
- int i = 0;
- for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
- if (i > 0)
- builder.append(", ");
- builder.append(iterator.next());
- }
- builder.append("]");
- return builder.toString();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
new file mode 100644
index 0000000..939b77c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{
+
+ private Map<String, StoragePluginConfig> storage;
+
+ @JsonCreator
+ public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
+ this.storage = storage;
+ }
+
+ public static void main(String[] args) throws Exception{
+ DrillConfig config = DrillConfig.create();
+ String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StoragePlugins se = config.getMapper().readValue(data, StoragePlugins.class);
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ config.getMapper().writeValue(System.out, se);
+ config.getMapper().writeValue(os, se);
+ se = config.getMapper().readValue(new ByteArrayInputStream(os.toByteArray()), StoragePlugins.class);
+ System.out.println(se);
+ }
+
+ @JsonProperty("storage")
+ public Map<String, StoragePluginConfig> getStorage() {
+ return storage;
+ }
+
+ @Override
+ public String toString() {
+ final int maxLen = 10;
+ return "StoragePlugins [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
+ }
+
+ @Override
+ public Iterator<Entry<String, StoragePluginConfig>> iterator() {
+ return storage.entrySet().iterator();
+ }
+
+ private String toString(Collection<?> collection, int maxLen) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ int i = 0;
+ for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+ if (i > 0)
+ builder.append(", ");
+ builder.append(iterator.next());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof StoragePlugins)) {
+ return false;
+ }
+ return storage.equals(((StoragePlugins) obj).getStorage());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 6400c75..411a76c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -95,6 +95,7 @@ public class Drillbit implements Closeable{
DrillbitEndpoint md = engine.start();
manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
cache.run();
+ manager.getContext().getStorage().init();
handle = coord.register(md);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 2386915..7a88098 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -20,12 +20,14 @@ package org.apache.drill.exec.store;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import com.google.common.base.Preconditions;
import net.hydromatic.linq4j.expressions.DefaultExpression;
import net.hydromatic.linq4j.expressions.Expression;
import net.hydromatic.optiq.SchemaPlus;
@@ -37,7 +39,10 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
@@ -53,8 +58,8 @@ import com.google.common.io.Resources;
public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
- private Map<Object, Constructor<? extends StoragePlugin>> availableEngines = new HashMap<Object, Constructor<? extends StoragePlugin>>();
- private final ImmutableMap<String, StoragePlugin> engines;
+ private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+ private ImmutableMap<String, StoragePlugin> plugins;
private DrillbitContext context;
private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
@@ -64,67 +69,93 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
public StoragePluginRegistry(DrillbitContext context) {
try{
this.context = context;
- init(context.getConfig());
- this.engines = ImmutableMap.copyOf(createEngines());
}catch(RuntimeException e){
- logger.error("Failure while loading storage engine registry.", e);
+ logger.error("Failure while loading storage plugin registry.", e);
throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
}
}
@SuppressWarnings("unchecked")
- public void init(DrillConfig config){
- Collection<Class<? extends StoragePlugin>> engines = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
- logger.debug("Loading storage engines {}", engines);
- for(Class<? extends StoragePlugin> engine: engines){
+ public void init() throws DrillbitStartupException {
+ DrillConfig config = context.getConfig();
+ Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ logger.debug("Loading storage plugins {}", plugins);
+ for(Class<? extends StoragePlugin> plugin: plugins){
int i =0;
- for(Constructor<?> c : engine.getConstructors()){
+ for(Constructor<?> c : plugin.getConstructors()){
Class<?>[] params = c.getParameterTypes();
if(params.length != 3 || params[1] != DrillbitContext.class || !StoragePluginConfig.class.isAssignableFrom(params[0]) || params[2] != String.class){
- logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext, String)]", c, engine);
+ logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
continue;
}
- availableEngines.put(params[0], (Constructor<? extends StoragePlugin>) c);
+ availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
i++;
}
if(i == 0){
- logger.debug("Skipping registration of StorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+ logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
}
}
+ this.plugins = ImmutableMap.copyOf(createPlugins());
}
-
- private Map<String, StoragePlugin> createEngines(){
- StorageEngines engines = null;
- Map<String, StoragePlugin> activeEngines = new HashMap<String, StoragePlugin>();
+
+ private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+ /*
+ * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+ * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+ * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+ */
+ StoragePlugins plugins = null;
+ StoragePlugins cachedPlugins = null;
+ Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
try{
- String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
- engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class);
+ URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+ if (url != null) {
+ String pluginsData = Resources.toString(url, Charsets.UTF_8);
+ plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+ }
+ DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+ StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+ if (cachedPluginsSerializable != null) {
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+ } else {
+ Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+ logger.debug("caching storage plugin config {}", plugins);
+ map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+ cachedPluginsSerializable = map.get("storage-plugins");
+ cachedPlugins = cachedPluginsSerializable.getObj();
+ }
+ if(!(plugins == null || cachedPlugins.equals(plugins))) {
+ logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+ throw new DrillbitStartupException("Storage plugin config mismatch");
+ }
+ logger.debug("using plugin config: {}", cachedPlugins);
}catch(IOException e){
- throw new IllegalStateException("Failure while reading storage engines data.", e);
+ throw new IllegalStateException("Failure while reading storage plugins data.", e);
}
-
- for(Map.Entry<String, StoragePluginConfig> config : engines){
+
+ for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
try{
StoragePlugin plugin = create(config.getKey(), config.getValue());
- activeEngines.put(config.getKey(), plugin);
+ activePlugins.put(config.getKey(), plugin);
}catch(ExecutionSetupException e){
logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
}
}
- activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-
- return activeEngines;
+ activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+
+ return activePlugins;
}
- public StoragePlugin getEngine(String registeredStorageEngineName) throws ExecutionSetupException {
- return engines.get(registeredStorageEngineName);
+ public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+ return plugins.get(registeredStoragePluginName);
}
-
- public StoragePlugin getEngine(StoragePluginConfig config) throws ExecutionSetupException {
+
+ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
if(config instanceof NamedStoragePluginConfig){
- return engines.get(((NamedStoragePluginConfig) config).name);
+ return plugins.get(((NamedStoragePluginConfig) config).name);
}else{
// TODO: for now, we'll throw away transient configs. we really ought to clean these up.
return create(null, config);
@@ -132,33 +163,33 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
}
public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
- StoragePlugin p = getEngine(storageConfig);
- if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a stroage engine that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
+ StoragePlugin p = getPlugin(storageConfig);
+ if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
FileSystemPlugin storage = (FileSystemPlugin) p;
return storage.getFormatPlugin(formatConfig);
}
- private StoragePlugin create(String name, StoragePluginConfig engineConfig) throws ExecutionSetupException {
- StoragePlugin engine = null;
- Constructor<? extends StoragePlugin> c = availableEngines.get(engineConfig.getClass());
+ private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+ StoragePlugin plugin = null;
+ Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
if (c == null)
- throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s",
- engineConfig));
+ throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+ pluginConfig));
try {
- engine = c.newInstance(engineConfig, context, name);
- return engine;
+ plugin = c.newInstance(pluginConfig, context, name);
+ return plugin;
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
if (t instanceof ExecutionSetupException)
throw ((ExecutionSetupException) t);
throw new ExecutionSetupException(String.format(
- "Failure setting up new storage engine configuration for config %s", engineConfig), t);
+ "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
}
}
@Override
public Iterator<Entry<String, StoragePlugin>> iterator() {
- return engines.entrySet().iterator();
+ return plugins.entrySet().iterator();
}
public DrillSchemaFactory getSchemaFactory(){
@@ -169,7 +200,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
@Override
public void registerSchemas(DrillUser user, SchemaPlus parent) {
- for(Map.Entry<String, StoragePlugin> e : engines.entrySet()){
+ for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
e.getValue().registerSchemas(user, parent);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 50678a6..232ec07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -18,11 +18,13 @@
package org.apache.drill.exec.store.dfs;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +33,8 @@ import com.beust.jcommander.internal.Lists;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class BasicFormatMatcher extends FormatMatcher{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
@@ -39,6 +43,8 @@ public class BasicFormatMatcher extends FormatMatcher{
private final MagicStringMatcher matcher;
protected final DrillFileSystem fs;
protected final FormatPlugin plugin;
+ protected final boolean compressible;
+ protected final CompressionCodecFactory codecFactory;
public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
super();
@@ -46,12 +52,21 @@ public class BasicFormatMatcher extends FormatMatcher{
this.matcher = new MagicStringMatcher(magicStrings);
this.fs = fs;
this.plugin = plugin;
+ this.compressible = false;
+ this.codecFactory = null;
}
- public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, String extension){
- this(plugin, fs, //
- Lists.newArrayList(Pattern.compile(".*\\." + extension)), //
- (List<MagicString>) Collections.EMPTY_LIST);
+ public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible){
+ List<Pattern> patterns = Lists.newArrayList();
+ for (String extension : extensions) {
+ patterns.add(Pattern.compile(".*\\." + extension));
+ }
+ this.patterns = patterns;
+ this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
+ this.fs = fs;
+ this.plugin = plugin;
+ this.compressible = compressible;
+ this.codecFactory = new CompressionCodecFactory(fs.getUnderlying().getConf());
}
@Override
@@ -62,14 +77,31 @@ public class BasicFormatMatcher extends FormatMatcher{
@Override
public FormatSelection isReadable(FileSelection selection) throws IOException {
if(isReadable(selection.getFirstPath(fs))){
- return new FormatSelection(plugin.getConfig(), selection);
+ if (plugin.getName() != null) {
+ NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
+ namedConfig.name = plugin.getName();
+ return new FormatSelection(namedConfig, selection);
+ } else {
+ return new FormatSelection(plugin.getConfig(), selection);
+ }
}
return null;
}
protected final boolean isReadable(FileStatus status) throws IOException {
+ CompressionCodec codec = null;
+ if (compressible) {
+ codec = codecFactory.getCodec(status.getPath());
+ }
+ String fileName;
+ if (codec != null) {
+ String path = status.getPath().toString();
+ fileName = path.substring(0, path.lastIndexOf('.'));
+ } else {
+ fileName = status.getPath().toString();
+ }
for(Pattern p : patterns){
- if(p.matcher(status.getPath().toString()).matches()){
+ if(p.matcher(fileName).matches()){
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 455c4b2..e392fa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -31,5 +31,16 @@ public class FileSystemConfig implements StoragePluginConfig{
public String connection;
public Map<String, String> workspaces;
public Map<String, FormatPluginConfig> formats;
-
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof FileSystemConfig)) {
+ return false;
+ }
+ FileSystemConfig that = (FileSystemConfig) obj;
+ boolean same = ((this.connection == null && that.connection == null) || this.connection.equals(that.connection)) &&
+ ((this.workspaces == null && that.workspaces == null) || this.workspaces.equals(that.workspaces)) &&
+ ((this.formats== null && that.formats == null) || this.formats.equals(that.formats));
+ return same;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index ebd8507..840a011 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.ClassPathFileSystem;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +70,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
Configuration fsConf = new Configuration();
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
+ fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
this.fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config);
List<FormatMatcher> matchers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index 7ce8c50..b40502f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -37,7 +37,7 @@ public class FormatCreator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
- static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPlugin.class);
+ static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig){
@@ -69,7 +69,8 @@ public class FormatCreator {
for(Constructor<?> c : pluginClass.getConstructors()){
try{
if(!FORMAT_BASED.check(c)) continue;
- constructors.put(pluginClass, c);
+ Class<? extends FormatPluginConfig> configClass = (Class<? extends FormatPluginConfig>) c.getParameterTypes()[4];
+ constructors.put(configClass, c);
}catch(Exception e){
logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
index 6b98ea2..173dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
@@ -17,12 +17,13 @@
*/
package org.apache.drill.exec.store.dfs;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("named")
-public class NamedFormatPluginConfig implements FormatPluginConfig{
+public class NamedFormatPluginConfig implements FormatPluginConfig {
public String name;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 780ec14..9c1dc74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -40,6 +40,10 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import com.beust.jcommander.internal.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
@@ -51,20 +55,24 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
private final boolean blockSplittable;
private final DrillFileSystem fs;
private final StoragePluginConfig storageConfig;
- private final FormatPluginConfig formatConfig;
+ protected final FormatPluginConfig formatConfig;
private final String name;
+ protected final CompressionCodecFactory codecFactory;
+ private final boolean compressible;
protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
- T formatConfig, boolean readable, boolean writable, boolean blockSplittable, String extension, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fs, extension);
+ T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
+ this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
this.readable = readable;
this.writable = writable;
this.context = context;
this.blockSplittable = blockSplittable;
+ this.compressible = compressible;
this.fs = fs;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
- this.name = name == null ? defaultName : name;
+ this.name = name == null ? defaultName : name;
+ this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
}
@Override
@@ -88,9 +96,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*
* @return True if splittable.
*/
- public boolean isBlockSplittable(){
+ public boolean isBlockSplittable() {
return blockSplittable;
- };
+ }
+
+ public boolean isCompressible() {
+ return compressible;
+ }
public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 6015865..fc2ae2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,8 +32,11 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.BlockMapBuilder;
@@ -70,11 +73,19 @@ public class EasyGroupScan extends AbstractGroupScan{
@JacksonInject StoragePluginRegistry engineRegistry, //
@JsonProperty("columns") List<SchemaPath> columns
) throws IOException, ExecutionSetupException {
-
+
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.selection = new FileSelection(files, true);
- this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
+ try{
+ BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+ this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+ this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }catch(IOException e){
+ logger.warn("Failure determining endpoint affinity.", e);
+ this.endpointAffinities = Collections.emptyList();
+ }
+ maxWidth = chunks.size();
this.columns = columns;
}
@@ -84,9 +95,17 @@ public class EasyGroupScan extends AbstractGroupScan{
List<SchemaPath> columns
) throws IOException{
this.selection = selection;
- this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
this.formatPlugin = formatPlugin;
this.columns = columns;
+ try{
+ BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+ this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+ this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }catch(IOException e){
+ logger.warn("Failure determining endpoint affinity.", e);
+ this.endpointAffinities = Collections.emptyList();
+ }
+ maxWidth = chunks.size();
}
@Override
@@ -127,15 +146,10 @@ public class EasyGroupScan extends AbstractGroupScan{
@Override
public List<EndpointAffinity> getOperatorAffinity() {
+ assert chunks != null && chunks.size() > 0;
if (this.endpointAffinities == null) {
- try{
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
- }catch(IOException e){
- logger.warn("Failure determining endpoint affinity.", e);
- this.endpointAffinities = Collections.emptyList();
- }
+ logger.debug("chunks: {}", chunks.size());
+ this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
return this.endpointAffinities;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 6631a6a..c01fb84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -25,8 +25,10 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.physical.base.AbstractSubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -52,7 +54,7 @@ public class EasySubScan extends AbstractSubScan{
@JacksonInject StoragePluginRegistry engineRegistry, //
@JsonProperty("columns") List<SchemaPath> columns //
) throws IOException, ExecutionSetupException {
-
+
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(this.formatPlugin);
this.files = files;
@@ -82,7 +84,13 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("format")
public FormatPluginConfig getFormatConfig(){
- return formatPlugin.getConfig();
+ if (formatPlugin.getName() != null) {
+ NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
+ namedConfig.name = formatPlugin.getName();
+ return namedConfig;
+ } else {
+ return formatPlugin.getConfig();
+ }
}
@JsonProperty("columns")