You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 22:34:22 UTC
[1/4] drill git commit: DRILL-1899: Consider read preference set by
users
Repository: drill
Updated Branches:
refs/heads/master 07cca5dba -> 033d0dfd6
DRILL-1899: Consider read preference set by users
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0f9887d9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0f9887d9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0f9887d9
Branch: refs/heads/master
Commit: 0f9887d91ee92f0c8f3197c55b95b694e7dcd9f4
Parents: 07cca5d
Author: B Anil Kumar <ak...@gmail.com>
Authored: Wed Apr 15 02:10:17 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:36:29 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/mongo/MongoGroupScan.java | 69 +++++++++++++++-----
.../exec/store/mongo/MongoRecordReader.java | 1 -
.../store/mongo/schema/MongoSchemaFactory.java | 1 -
3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index f6b449b..b086786 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -166,7 +166,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
return databaseNames.contains(CONFIG);
}
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
private void init() throws IOException {
MongoClient client = null;
try {
@@ -178,9 +178,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
chunksInverseMapping = Maps.newLinkedHashMap();
if (isShardedCluster(client)) {
DB db = client.getDB(CONFIG);
- db.setReadPreference(ReadPreference.nearest());
DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
-
DBObject query = new BasicDBObject(1);
query
.put(
@@ -212,23 +210,29 @@ public class MongoGroupScan extends AbstractGroupScan implements
String[] tagAndHost = StringUtils.split(hostEntry, '/');
String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
- Set<ServerAddress> addressList = chunksMapping.get(chunkId);
+ List<String> chunkHosts = Arrays.asList(hosts);
+ //to get the address list from one of the shard nodes, need to get port.
+ MongoClient shardClient = new MongoClient(hosts[0]);
+ Set<ServerAddress> addressList = getPreferredHosts(shardClient, chunkHosts);
if (addressList == null) {
addressList = Sets.newHashSet();
- chunksMapping.put(chunkId, addressList);
- }
- for (String host : hosts) {
- addressList.add(new ServerAddress(host));
+ for (String host : chunkHosts) {
+ addressList.add(new ServerAddress(host));
+ }
}
+ chunksMapping.put(chunkId, addressList);
ServerAddress address = addressList.iterator().next();
-
List<ChunkInfo> chunkList = chunksInverseMapping.get(address
.getHost());
if (chunkList == null) {
chunkList = Lists.newArrayList();
chunksInverseMapping.put(address.getHost(), chunkList);
}
- ChunkInfo chunkInfo = new ChunkInfo(Arrays.asList(hosts), chunkId);
+ List<String> chunkHostsList = new ArrayList<String>();
+ for(ServerAddress serverAddr : addressList){
+ chunkHostsList.add(serverAddr.toString());
+ }
+ ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
Map<String, Object> minFilters = Maps.newHashMap();
@@ -261,10 +265,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
String chunkName = scanSpec.getDbName() + "."
+ scanSpec.getCollectionName();
List<String> hosts = clientURI.getHosts();
- Set<ServerAddress> addressList = Sets.newHashSet();
-
- for (String host : hosts) {
- addressList.add(new ServerAddress(host));
+ Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
+ if (addressList == null) {
+ addressList = Sets.newHashSet();
+ for (String host : hosts) {
+ addressList.add(new ServerAddress(host));
+ }
}
chunksMapping.put(chunkName, addressList);
@@ -287,6 +293,40 @@ public class MongoGroupScan extends AbstractGroupScan implements
}
+ @SuppressWarnings("unchecked")
+ private Set<ServerAddress> getPreferredHosts(MongoClient client,
+ List<String> hosts) throws UnknownHostException {
+ Set<ServerAddress> addressList = Sets.newHashSet();
+ DB db = client.getDB(scanSpec.getDbName());
+ ReadPreference readPreference = client.getReadPreference();
+ switch (readPreference.getName().toUpperCase()) {
+ case "PRIMARY":
+ case "PRIMARYPREFERRED":
+ String primaryHost = db.command("isMaster").getString("primary");
+ addressList.add(new ServerAddress(primaryHost));
+ return addressList;
+ case "SECONDARY":
+ case "SECONDARYPREFERRED":
+ primaryHost = db.command("isMaster").getString("primary");
+ @SuppressWarnings("unchecked")
+ List<String> hostsList = (List<String>) db.command("isMaster").get(
+ "hosts");
+ hostsList.remove(primaryHost);
+ for (String host : hostsList) {
+ addressList.add(new ServerAddress(host));
+ }
+ return addressList;
+ case "NEAREST":
+ hostsList = (List<String>) db.command("isMaster").get("hosts");
+ for (String host : hostsList) {
+ addressList.add(new ServerAddress(host));
+ }
+ return addressList;
+ default:
+ return null;
+ }
+ }
+
@Override
public GroupScan clone(List<SchemaPath> columns) {
MongoGroupScan clone = new MongoGroupScan(this);
@@ -433,7 +473,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
MongoClient client = MongoCnxnManager.getClient(addresses,
clientURI.getOptions());
DB db = client.getDB(scanSpec.getDbName());
- db.setReadPreference(ReadPreference.nearest());
DBCollection collection = db.getCollectionFromString(scanSpec
.getCollectionName());
CommandResult stats = collection.getStats();
http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 6eff72f..0b263df 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -136,7 +136,6 @@ public class MongoRecordReader extends AbstractRecordReader {
}
MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
DB db = client.getDB(subScanSpec.getDbName());
- db.setReadPreference(ReadPreference.nearest());
collection = db.getCollection(subScanSpec.getCollectionName());
} catch (UnknownHostException e) {
throw new DrillRuntimeException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index a227c9a..3c70638 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -114,7 +114,6 @@ public class MongoSchemaFactory implements SchemaFactory {
@Override
public List<String> load(String dbName) throws Exception {
DB db = MongoCnxnManager.getClient(addresses, options).getDB(dbName);
- db.setReadPreference(ReadPreference.nearest());
Set<String> collectionNames = db.getCollectionNames();
return new ArrayList<>(collectionNames);
}
[4/4] drill git commit: DRILL-2658: Add ilike and substring functions
Posted by sm...@apache.org.
DRILL-2658: Add ilike and substring functions
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/033d0dfd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/033d0dfd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/033d0dfd
Branch: refs/heads/master
Commit: 033d0dfd6b7a5fa0eeaf4b337955b3870c2d680a
Parents: bf3db31
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Mar 23 18:17:26 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:44:34 2015 -0700
----------------------------------------------------------------------
.../exec/expr/fn/impl/StringFunctions.java | 103 +++++++++++++++++++
.../exec/expr/fn/impl/TestStringFunctions.java | 59 +++++++++++
2 files changed, 162 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/033d0dfd/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
index a47bf87..49f581f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
@@ -90,6 +91,50 @@ public class StringFunctions{
}
}
+ @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+ public static class ILike implements DrillSimpleFunc{
+
+ @Param VarCharHolder input;
+ @Param(constant=true) VarCharHolder pattern;
+ @Output BitHolder out;
+ @Workspace java.util.regex.Matcher matcher;
+
+ public void setup() {
+ matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( //
+ org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)),
+ java.util.regex.Pattern.CASE_INSENSITIVE).matcher("");
+ }
+
+ public void eval() {
+ String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+ matcher.reset(i);
+ out.value = matcher.matches()? 1:0;
+ }
+ }
+
+ @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+ public static class ILikeWithEscape implements DrillSimpleFunc{
+
+ @Param VarCharHolder input;
+ @Param(constant=true) VarCharHolder pattern;
+ @Param(constant=true) VarCharHolder escape;
+ @Output BitHolder out;
+ @Workspace java.util.regex.Matcher matcher;
+
+ public void setup() {
+ matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( //
+ org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer),
+ org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(escape.start, escape.end, escape.buffer)),
+ java.util.regex.Pattern.CASE_INSENSITIVE).matcher("");
+ }
+
+ public void eval() {
+ String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+ matcher.reset(i);
+ out.value = matcher.matches()? 1:0;
+ }
+ }
+
@FunctionTemplate(names = {"similar", "similar_to"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
public static class Similar implements DrillSimpleFunc{
@Param VarCharHolder input;
@@ -409,6 +454,64 @@ public class StringFunctions{
}
+ @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
+ public static class SubstringRegex implements DrillSimpleFunc{
+
+ @Param VarCharHolder input;
+ @Param(constant=true) VarCharHolder pattern;
+ @Output NullableVarCharHolder out;
+ @Workspace java.util.regex.Matcher matcher;
+
+ public void setup() {
+ matcher = java.util.regex.Pattern.compile(
+ org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))
+ .matcher("");
+ }
+
+ public void eval() {
+ String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+ matcher.reset(i);
+ if (matcher.find()) {
+ out.isSet = 1;
+ out.buffer = input.buffer;
+ out.start = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.start());
+ out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.end());
+ }
+ }
+ }
+
+ @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
+ public static class SubstringRegexNullable implements DrillSimpleFunc{
+
+ @Param NullableVarCharHolder input;
+ @Param(constant=true) VarCharHolder pattern;
+ @Output NullableVarCharHolder out;
+ @Workspace java.util.regex.Matcher matcher;
+
+ public void setup() {
+ matcher = java.util.regex.Pattern.compile(
+ org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))
+ .matcher("");
+ }
+
+ public void eval() {
+ if (input.isSet == 0) {
+ out.isSet = 0;
+ } else {
+ String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+ matcher.reset(i);
+ if (matcher.find()) {
+ out.isSet = 1;
+ out.buffer = input.buffer;
+ out.start = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.start());
+ out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.end());
+ } else {
+ out.isSet = 0;
+ }
+ }
+ }
+ }
+
// Return first length characters in the string. When length is negative, return all but last |length| characters.
// If length > total charcounts, return the whole string.
// If length = 0, return empty
http://git-wip-us.apache.org/repos/asf/drill/blob/033d0dfd/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
new file mode 100644
index 0000000..e1ef7c9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestStringFunctions extends BaseTestQuery {
+
+ @Test
+ public void testILike() throws Exception {
+ testBuilder()
+ .sqlQuery("select n_name from cp.`tpch/nation.parquet` where ilike(n_name, '%united%') = true")
+ .unOrdered()
+ .baselineColumns("n_name")
+ .baselineValues("UNITED STATES")
+ .baselineValues("UNITED KINGDOM")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testILikeEscape() throws Exception {
+ testBuilder()
+ .sqlQuery("select a from (select concat(r_name , '_region') a from cp.`tpch/region.parquet`) where ilike(a, 'asia#_region', '#') = true")
+ .unOrdered()
+ .baselineColumns("a")
+ .baselineValues("ASIA_region")
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testSubstr() throws Exception {
+ testBuilder()
+ .sqlQuery("select substr(n_name, 'UN.TE.') a from cp.`tpch/nation.parquet` where ilike(n_name, 'united%') = true")
+ .unOrdered()
+ .baselineColumns("a")
+ .baselineValues("UNITED")
+ .baselineValues("UNITED")
+ .build()
+ .run();
+ }
+}
[2/4] drill git commit: DRILL-1512: Avro record reader
Posted by sm...@apache.org.
DRILL-1512: Avro record reader
Reader for Avro data files.
Supports:
- All primitive types
- Arrays
- Nested records
- Enums
Unimplemented:
- Endpoint affinity
- Recursive data types
- Complex types: Maps, Fixed, Unions
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55a9a59d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55a9a59d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55a9a59d
Branch: refs/heads/master
Commit: 55a9a59d409ee93e42fcdbdc12afd8fb0caffe3f
Parents: 0f9887d
Author: Andrew Selden <an...@elasticsearch.com>
Authored: Fri Oct 10 16:41:59 2014 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:39:46 2015 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 10 +
.../drill/exec/store/avro/AvroFormatConfig.java | 39 +++
.../drill/exec/store/avro/AvroFormatPlugin.java | 128 ++++++++
.../drill/exec/store/avro/AvroGroupScan.java | 208 ++++++++++++
.../drill/exec/store/avro/AvroRecordReader.java | 324 +++++++++++++++++++
.../exec/store/avro/AvroScanBatchCreator.java | 52 +++
.../drill/exec/store/avro/AvroSubScan.java | 142 ++++++++
.../drill/exec/store/avro/AvroTypeHelper.java | 184 +++++++++++
.../drill/exec/store/avro/MapOrListWriter.java | 105 ++++++
.../resources/bootstrap-storage-plugins.json | 6 +
.../drill/exec/store/avro/AvroFormatTest.java | 129 ++++++++
.../drill/exec/store/avro/AvroTestUtil.java | 270 ++++++++++++++++
.../resources/bootstrap-storage-plugins.json | 3 +
.../apache/drill/exec/proto/UserBitShared.java | 16 +-
.../exec/proto/beans/CoreOperatorType.java | 4 +-
protocol/src/main/protobuf/UserBitShared.proto | 1 +
16 files changed, 1617 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index fdd24ef..f5313ca 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -353,6 +353,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.7.7</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
new file mode 100644
index 0000000..4a2cfb9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+/**
+ * Format plugin config for Avro data files.
+ */
+@JsonTypeName("avro")
+public class AvroFormatConfig implements FormatPluginConfig {
+
+ @Override
+ public int hashCode() {
+ return 101; // XXX - WHAT IS THIS SUPPOSED TO BE?
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof AvroFormatConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
new file mode 100644
index 0000000..4fe1f71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Format plugin for Avro data files.
+ */
+public class AvroFormatPlugin implements FormatPlugin {
+
+ private final String name;
+ private final DrillbitContext context;
+ private final DrillFileSystem fs;
+ private final StoragePluginConfig storagePluginConfig;
+ private final AvroFormatConfig formatConfig;
+ private final BasicFormatMatcher matcher;
+
+ public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+ StoragePluginConfig storagePluginConfig) {
+ this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+ }
+
+ public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+ StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) {
+ this.name = name;
+ this.context = context;
+ this.fs = fs;
+ this.storagePluginConfig = storagePluginConfig;
+ this.formatConfig = formatConfig;
+
+ // XXX - What does 'compressible' mean in this context?
+ this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
+ throw new UnsupportedOperationException("Unimplemented");
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
+ return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
+ return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+ }
+
+ @Override
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public AvroFormatConfig getConfig() {
+ return formatConfig;
+ }
+
+ @Override
+ public StoragePluginConfig getStorageConfig() {
+ return storagePluginConfig;
+ }
+
+ @Override
+ public DrillFileSystem getFileSystem() {
+ return fs;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
new file mode 100644
index 0000000..fcc1f94
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+//import org.apache.drill.exec.store.avro.AvroSubScan.AvroSubScanSpec;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Group scan implementation for Avro data files.
+ */
+@JsonTypeName("avro-scan")
+public class AvroGroupScan extends AbstractGroupScan {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroGroupScan.class);
+
+ private final AvroFormatPlugin formatPlugin;
+ private final AvroFormatConfig formatConfig;
+ private final List<SchemaPath> columns;
+ private final FileSystem fs;
+ private final List<ReadEntryWithPath> entries;
+ private final String selectionRoot;
+
+ //private Map<Integer, List<AvroSubScanSpec>> endpointMappings;
+
+ private List<EndpointAffinity> endpointAffinities;
+
+ @JsonCreator
+ public AvroGroupScan(@JsonProperty("entries") final List<ReadEntryWithPath> entries,
+ @JsonProperty("storage") final StoragePluginConfig storageConfig,
+ @JsonProperty("format") final FormatPluginConfig formatConfig,
+ @JacksonInject final StoragePluginRegistry engineRegistry,
+ @JsonProperty("columns") final List<SchemaPath> columns,
+ @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+
+ this.columns = columns;
+ final AvroFormatConfig afc;
+ if (formatConfig == null) {
+ afc = new AvroFormatConfig();
+ } else {
+ afc = (AvroFormatConfig) formatConfig;
+ }
+ Preconditions.checkNotNull(storageConfig);
+ Preconditions.checkNotNull(afc);
+ this.formatPlugin = (AvroFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, afc);
+ Preconditions.checkNotNull(this.formatPlugin);
+ this.fs = formatPlugin.getFileSystem().getUnderlying();
+ this.formatConfig = formatPlugin.getConfig();
+ this.entries = entries;
+ this.selectionRoot = selectionRoot;
+ }
+
+ public AvroGroupScan(final List<FileStatus> files, final AvroFormatPlugin formatPlugin,
+ final String selectionRoot, final List<SchemaPath> columns) throws IOException {
+
+ this.formatPlugin = formatPlugin;
+ this.columns = columns;
+ this.formatConfig = formatPlugin.getConfig();
+ this.fs = formatPlugin.getFileSystem().getUnderlying();
+ this.selectionRoot = selectionRoot;
+
+ this.entries = Lists.newArrayList();
+ for (final FileStatus fs : files) {
+ entries.add(new ReadEntryWithPath(fs.getPath().toString()));
+ }
+ }
+
+ @JsonProperty("format")
+ public AvroFormatConfig getFormatConfig() {
+ return this.formatConfig;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getEngineConfig() {
+ return this.formatPlugin.getStorageConfig();
+ }
+
+ private AvroGroupScan(final AvroGroupScan that, final List<SchemaPath> columns) {
+ this.columns = (columns == null) ? that.columns : columns;
+ this.entries = that.entries;
+ this.formatConfig = that.formatConfig;
+ this.formatPlugin = that.formatPlugin;
+ this.fs = that.fs;
+ this.selectionRoot = that.selectionRoot;
+
+ // XXX - DON'T FORGET TO ADD THESE AFTER WE'VE IMPLEMENTED AFFINITY
+ //this.endpointAffinities = that.endpointAffinities;
+ //this.mappings = that.mappings;
+ //this.rowCount = that.rowCount;
+ //this.rowGroupInfos = that.rowGroupInfos;
+ //this.columnValueCounts = that.columnValueCounts;
+ }
+
+ @Override
+ public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+ // XXX - Unimplemented
+ logger.warn("AvroGroupScan.applyAssignments() is not implemented");
+ }
+
+ @Override
+ public AvroSubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
+
+ final AvroSubScan sub = new AvroSubScan(formatPlugin, columns, selectionRoot);
+
+ // XXX - This is a temporary hack just to get something working. Need to revisit sub-scan specs
+ // once we work out affinity and endpoints.
+ sub.setEntry(entries.get(0));
+ sub.setFileSystem(fs);
+
+ return sub;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ // XXX - Finish
+ return 1;
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ // XXX - Is 0 the correct value for second arg? What if I don't know the row count a priori?
+ return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 0, 1, 1);
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ // XXX - Unimplemented
+ if (endpointAffinities != null) {
+ return endpointAffinities;
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new AvroGroupScan(this, null);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return "AvroGroupScan [entries=" + entries +
+ ", selectionRoot=" + selectionRoot +
+ ", columns=" + columns + "]";
+ }
+
+ @Override
+ public GroupScan clone(final List<SchemaPath> columns) {
+ return new AvroGroupScan(this, columns);
+ }
+
+ @JsonIgnore
+ public boolean canPushdownProjects(final List<SchemaPath> columns) {
+ return true;
+ }
+
+ public List<ReadEntryWithPath> getEntries() {
+ return entries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
new file mode 100644
index 0000000..3b7697d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Stopwatch;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A RecordReader implementation for Avro data files.
+ *
+ * @see RecordReader
+ */
+public class AvroRecordReader extends AbstractRecordReader {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
+
+ private final Path hadoop;
+ private DrillBuf buffer;
+ private VectorContainerWriter writer;
+
+ private DataFileReader<GenericContainer> reader = null;
+ private OperatorContext operatorContext;
+
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
+
+ public AvroRecordReader(final FragmentContext fragmentContext,
+ final String inputPath,
+ final FileSystem fileSystem,
+ final List<SchemaPath> projectedColumns) {
+ this(fragmentContext, inputPath, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+ }
+
+ public AvroRecordReader(final FragmentContext fragmentContext,
+ final String inputPath,
+ final FileSystem fileSystem,
+ List<SchemaPath> projectedColumns, final int defaultBatchSize) {
+
+ hadoop = new Path(inputPath);
+ buffer = fragmentContext.getManagedBuffer();
+
+ setColumns(projectedColumns);
+ }
+
+ @Override
+ public void setup(final OutputMutator output) throws ExecutionSetupException {
+
+ writer = new VectorContainerWriter(output);
+
+ try {
+ reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>());
+ } catch (IOException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public void setOperatorContext(OperatorContext operatorContext) {
+ this.operatorContext = operatorContext;
+ }
+
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public int next() {
+ final Stopwatch watch = new Stopwatch().start();
+
+ if (reader == null) {
+ throw new IllegalStateException("Avro reader is not open.");
+ }
+ if (!reader.hasNext()) {
+ return 0;
+ }
+
+ int recordCount = 0;
+ writer.allocate();
+ writer.reset();
+
+ try {
+
+ // XXX - Implement batch size
+
+ for (GenericContainer container = null; reader.hasNext(); recordCount++) {
+ writer.setPosition(recordCount);
+ container = reader.next(container);
+ processRecord(container, container.getSchema());
+ }
+
+ writer.setValueCount(recordCount);
+
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+ return recordCount;
+ }
+
+ private void processRecord(final GenericContainer container, final Schema schema) {
+
+ final Schema.Type type = schema.getType();
+
+ switch (type) {
+ case RECORD:
+ process(container, schema, null, new MapOrListWriter(writer.rootAsMap()));
+ break;
+ default:
+ throw new DrillRuntimeException("Root object must be record type. Found: " + type);
+ }
+ }
+
+ private void process(final Object value, final Schema schema, final String fieldName, final MapOrListWriter writer) {
+
+ writer.start();
+ final Schema.Type type = schema.getType();
+
+ switch (type) {
+ case RECORD:
+ for (final Schema.Field field : schema.getFields()) {
+
+ MapOrListWriter _writer = writer;
+ if (field.schema().getType() == Schema.Type.RECORD) {
+ _writer = writer.map(field.name());
+ }
+
+ process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer);
+ }
+ break;
+ case ARRAY:
+ assert fieldName != null;
+ final GenericArray array = (GenericArray) value;
+ for (final Object o : array) {
+ process(o, array.getSchema().getElementType(), fieldName, writer.list(fieldName));
+ }
+ break;
+ case FIXED:
+ case UNION:
+ case MAP:
+ throw new UnsupportedOperationException("Unimplemented type: " + type.toString());
+ case ENUM: // Enum symbols are strings
+ case NULL: // Treat null type as a primitive
+ default:
+ assert fieldName != null;
+
+ if (writer.isMapWriter()) {
+ SchemaPath path;
+ if (writer.map.getField().getPath().getRootSegment().getPath().equals("")) {
+ path = new SchemaPath(new PathSegment.NameSegment(fieldName));
+ } else {
+ path = writer.map.getField().getPath().getChild(fieldName);
+ }
+
+ if (!selected(path)) {
+ break;
+ }
+ }
+
+ processPrimitive(value, schema.getType(), fieldName, writer);
+ break;
+ }
+
+ writer.end();
+ }
+
+ private void processPrimitive(final Object value, final Schema.Type type, final String fieldName,
+ final MapOrListWriter writer) {
+
+ switch (type) {
+ case STRING:
+ final Utf8 utf8 = (Utf8) value;
+ final int length = utf8.length();
+ final VarCharHolder vh = new VarCharHolder();
+ ensure(length);
+ buffer.setBytes(0, utf8.getBytes());
+ vh.buffer = buffer;
+ vh.start = 0;
+ vh.end = length;
+ writer.varChar(fieldName).write(vh);
+ break;
+ case INT:
+ final IntHolder ih = new IntHolder();
+ ih.value = (Integer) value;
+ writer.integer(fieldName).write(ih);
+ break;
+ case LONG:
+ final BigIntHolder bh = new BigIntHolder();
+ bh.value = (Long) value;
+ writer.bigInt(fieldName).write(bh);
+ break;
+ case FLOAT:
+ final Float4Holder fh = new Float4Holder();
+ fh.value = (Float) value;
+ writer.float4(fieldName).write(fh);
+ break;
+ case DOUBLE:
+ final Float8Holder f8h = new Float8Holder();
+ f8h.value = (Double) value;
+ writer.float8(fieldName).write(f8h);
+ break;
+ case BOOLEAN:
+ final BitHolder bit = new BitHolder();
+ bit.value = (Boolean) value ? 1 : 0;
+ writer.bit(fieldName).write(bit);
+ break;
+ case BYTES:
+ // XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields.
+ final VarBinaryHolder vb = new VarBinaryHolder();
+ final ByteBuffer buf = (ByteBuffer) value;
+ final byte[] bytes = buf.array();
+ ensure(bytes.length);
+ buffer.setBytes(0, bytes);
+ vb.buffer = buffer;
+ vb.start = 0;
+ vb.end = bytes.length;
+ writer.binary(fieldName).write(vb);
+ break;
+ case NULL:
+ // Nothing to do for null type
+ break;
+ case ENUM:
+ final String symbol = value.toString();
+ final byte[] b;
+ try {
+ b = symbol.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
+ }
+ final VarCharHolder vch = new VarCharHolder();
+ ensure(b.length);
+ buffer.setBytes(0, b);
+ vch.buffer = buffer;
+ vch.start = 0;
+ vch.end = b.length;
+ writer.varChar(fieldName).write(vch);
+ break;
+ default:
+ throw new DrillRuntimeException("Unhandled Avro type: " + type.toString());
+ }
+ }
+
+ private boolean selected(SchemaPath field) {
+ if (isStarQuery()) {
+ return true;
+ }
+ for (final SchemaPath sp : getColumns()) {
+ if (sp.contains(field)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void ensure(final int length) {
+ buffer = buffer.reallocIfNeeded(length);
+ }
+
+ @Override
+ public void cleanup() {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Avro reader", e);
+ } finally {
+ reader = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
new file mode 100644
index 0000000..42c8e99
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+/**
+ * Batch creator for Avro scans.
+ */
+public class AvroScanBatchCreator implements BatchCreator<AvroSubScan> {
+
+
+ @Override
+ public RecordBatch getBatch(final FragmentContext context, final AvroSubScan subScan,
+ final List<RecordBatch> children) throws ExecutionSetupException {
+
+ Preconditions.checkArgument(children.isEmpty());
+ List<SchemaPath> columns = subScan.getColumns();
+ List<RecordReader> readers = Lists.newArrayList();
+
+ readers.add(new AvroRecordReader(context, subScan.getEntry().getPath(), subScan.getFileSystem(), columns));
+
+ return new ScanBatch(subScan, context, readers.iterator());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
new file mode 100644
index 0000000..0a579aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Contains information for reading a single Avro row group from HDFS.
+ */
+@JsonTypeName("avro-sub-scan")
+public class AvroSubScan extends AbstractBase implements SubScan {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroSubScan.class);
+
+ private final AvroFormatPlugin formatPlugin;
+ private final AvroFormatConfig formatConfig;
+ private final List<SchemaPath> columns;
+ private final String selectionRoot;
+
+ private ReadEntryWithPath entry;
+ private FileSystem fs;
+
+ @JsonCreator
+ public AvroSubScan(@JacksonInject final StoragePluginRegistry registry,
+ @JsonProperty("storage") final StoragePluginConfig storageConfig,
+ @JsonProperty("format") final FormatPluginConfig formatConfig,
+ @JsonProperty("columns") final List<SchemaPath> columns,
+ @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+ this((AvroFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+ formatConfig == null ? new AvroFormatConfig() : formatConfig), columns, selectionRoot);
+ }
+
+ public AvroSubScan(final AvroFormatPlugin formatPlugin, final List<SchemaPath> columns,
+ final String selectionRoot) {
+ this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
+ this.formatConfig = formatPlugin.getConfig();
+ this.columns = columns;
+ this.selectionRoot = selectionRoot;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getEngineConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public AvroFormatConfig getFormatConfig() {
+ return formatConfig;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(final PhysicalVisitor<T, X, E> physicalVisitor, final X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new AvroSubScan(formatPlugin, columns, selectionRoot);
+ }
+
+ @Override
+ public int getOperatorType() {
+ return UserBitShared.CoreOperatorType.AVRO_ROW_GROUP_SCAN_VALUE;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ @JsonIgnore
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ /*
+ public static class AvroSubScanSpec {
+
+ }
+ */
+
+ /** XXX - temp hacks **/
+
+ @JsonIgnore
+ public void setEntry(ReadEntryWithPath entry) {
+ this.entry = entry;
+ }
+
+ @JsonIgnore
+ public ReadEntryWithPath getEntry() {
+ return entry;
+ }
+
+ @JsonIgnore
+ public void setFileSystem(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @JsonIgnore
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
new file mode 100644
index 0000000..aeb659f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.Types;
+
+/**
+ * Utility class for working with Avro data types.
+ */
+public final class AvroTypeHelper {
+
+ // XXX - Decide what to do about Avro's NULL type
+ /*
+ public static final MajorType MAJOR_TYPE_NULL_OPTIONAL = Types.optional(MinorType.NULL);
+ public static final MajorType MAJOR_TYPE_NULL_REQUIRED = Types.required(MinorType.NULL);
+ public static final MajorType MAJOR_TYPE_NULL_REPEATED = Types.repeated(MinorType.NULL);
+ */
+ public static final MajorType MAJOR_TYPE_BOOL_OPTIONAL = Types.optional(MinorType.UINT1);
+ public static final MajorType MAJOR_TYPE_BOOL_REQUIRED = Types.required(MinorType.UINT1);
+ public static final MajorType MAJOR_TYPE_BOOL_REPEATED = Types.repeated(MinorType.UINT1);
+ public static final MajorType MAJOR_TYPE_INT_OPTIONAL = Types.optional(MinorType.INT);
+ public static final MajorType MAJOR_TYPE_INT_REQUIRED = Types.required(MinorType.INT);
+ public static final MajorType MAJOR_TYPE_INT_REPEATED = Types.repeated(MinorType.INT);
+ public static final MajorType MAJOR_TYPE_BIGINT_OPTIONAL = Types.optional(MinorType.BIGINT);
+ public static final MajorType MAJOR_TYPE_BIGINT_REQUIRED = Types.required(MinorType.BIGINT);
+ public static final MajorType MAJOR_TYPE_BIGINT_REPEATED = Types.repeated(MinorType.BIGINT);
+ public static final MajorType MAJOR_TYPE_FLOAT4_OPTIONAL = Types.optional(MinorType.FLOAT4);
+ public static final MajorType MAJOR_TYPE_FLOAT4_REQUIRED = Types.required(MinorType.FLOAT4);
+ public static final MajorType MAJOR_TYPE_FLOAT4_REPEATED = Types.repeated(MinorType.FLOAT4);
+ public static final MajorType MAJOR_TYPE_FLOAT8_OPTIONAL = Types.optional(MinorType.FLOAT8);
+ public static final MajorType MAJOR_TYPE_FLOAT8_REQUIRED = Types.required(MinorType.FLOAT8);
+ public static final MajorType MAJOR_TYPE_FLOAT8_REPEATED = Types.repeated(MinorType.FLOAT8);
+ public static final MajorType MAJOR_TYPE_VARBINARY_OPTIONAL = Types.optional(MinorType.VARBINARY);
+ public static final MajorType MAJOR_TYPE_VARBINARY_REQUIRED = Types.required(MinorType.VARBINARY);
+ public static final MajorType MAJOR_TYPE_VARBINARY_REPEATED = Types.repeated(MinorType.VARBINARY);
+ public static final MajorType MAJOR_TYPE_VARCHAR_OPTIONAL = Types.optional(MinorType.VARCHAR);
+ public static final MajorType MAJOR_TYPE_VARCHAR_REQUIRED = Types.required(MinorType.VARCHAR);
+ public static final MajorType MAJOR_TYPE_VARCHAR_REPEATED = Types.repeated(MinorType.VARCHAR);
+
+
+ private static final String UNSUPPORTED = "Unsupported type: %s [%s]";
+
+ private AvroTypeHelper() { }
+
+ /**
+ * Maintains a mapping between Avro types and Drill types. Given an Avro data
+ * type, this method will return the corresponding Drill field major type.
+ *
+ * @param field Avro field
+ * @return Major type or null if no corresponding type
+ */
+ public static MajorType getFieldMajorType(final Field field, final DataMode mode) {
+ return getFieldMajorType(field.schema().getType(), mode);
+ }
+
+ /**
+ * Maintains a mapping between Avro types and Drill types. Given an Avro data
+ * type, this method will return the corresponding Drill field major type.
+ *
+ * @param type Avro type
+ * @param mode Data mode
+ * @return Drill major type or null if no corresponding type
+ */
+ public static MajorType getFieldMajorType(final Type type, final DataMode mode) {
+
+ switch (type) {
+ case MAP:
+ case RECORD:
+ case ENUM:
+ case UNION:
+ throw new UnsupportedOperationException("Complex types are unimplemented");
+ case NULL:
+ /*
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_NULL_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_NULL_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_NULL_REPEATED;
+ }
+ break;
+ */
+ throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+ case ARRAY:
+ break;
+ case BOOLEAN:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_BOOL_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_BOOL_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_BOOL_REPEATED;
+ }
+ break;
+ case INT:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_INT_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_INT_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_INT_REPEATED;
+ }
+ break;
+ case LONG:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_BIGINT_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_BIGINT_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_BIGINT_REPEATED;
+ }
+ break;
+ case FLOAT:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_FLOAT4_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_FLOAT4_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_FLOAT4_REPEATED;
+ }
+ break;
+ case DOUBLE:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_FLOAT8_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_FLOAT8_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_FLOAT8_REPEATED;
+ }
+ break;
+ case BYTES:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_VARBINARY_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_VARBINARY_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_VARBINARY_REPEATED;
+ }
+ break;
+ case STRING:
+ switch (mode) {
+ case OPTIONAL:
+ return MAJOR_TYPE_VARCHAR_OPTIONAL;
+ case REQUIRED:
+ return MAJOR_TYPE_VARCHAR_REQUIRED;
+ case REPEATED:
+ return MAJOR_TYPE_VARCHAR_REPEATED;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+ }
+
+ throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
new file mode 100644
index 0000000..d2a1031
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+
+/**
+ * Impersonates a map writer or a list writer depending on construction type.
+ * Perhaps this is a tragic misuse of polymorphism?
+ */
+public class MapOrListWriter {
+
+ final BaseWriter.MapWriter map;
+ final BaseWriter.ListWriter list;
+
+ MapOrListWriter(final BaseWriter.MapWriter writer) {
+ this.map = writer;
+ this.list = null;
+ }
+
+ MapOrListWriter(final BaseWriter.ListWriter writer) {
+ this.map = null;
+ this.list = writer;
+ }
+
+ void start() {
+ if (map != null) {
+ map.start();
+ } else {
+ list.start();
+ }
+ }
+
+ void end() {
+ if (map != null) {
+ map.end();
+ } else {
+ list.end();
+ }
+ }
+
+ MapOrListWriter map(final String name) {
+ assert map != null;
+ return new MapOrListWriter(map.map(name));
+ }
+
+ MapOrListWriter list(final String name) {
+ assert map != null;
+ return new MapOrListWriter(map.list(name));
+ }
+
+ boolean isMapWriter() {
+ return map != null;
+ }
+
+ VarCharWriter varChar(final String name) {
+ return (map != null) ? map.varChar(name) : list.varChar();
+ }
+
+ IntWriter integer(final String name) {
+ return (map != null) ? map.integer(name) : list.integer();
+ }
+
+ BigIntWriter bigInt(final String name) {
+ return (map != null) ? map.bigInt(name) : list.bigInt();
+ }
+
+ Float4Writer float4(final String name) {
+ return (map != null) ? map.float4(name) : list.float4();
+ }
+
+ Float8Writer float8(final String name) {
+ return (map != null) ? map.float8(name) : list.float8();
+ }
+
+ BitWriter bit(final String name) {
+ return (map != null) ? map.bit(name) : list.bit();
+ }
+
+ VarBinaryWriter binary(final String name) {
+ return (map != null) ? map.varBinary(name) : list.varBinary();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 6bf1872..3253e80 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -34,6 +34,9 @@
},
"json" : {
type: "json"
+ },
+ "avro" : {
+ type: "avro"
}
}
},
@@ -57,6 +60,9 @@
},
"parquet" : {
type: "parquet"
+ },
+ "avro" : {
+ type: "avro"
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
new file mode 100644
index 0000000..2d2522b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.BaseTestQuery;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for Avro record reader.
+ */
+public class AvroFormatTest extends BaseTestQuery {
+
+ // XXX
+ // 1. Need to test nested field names with same name as top-level names for conflict.
+ // 2. Avro supports linked lists, right? Can we test this?
+ // 3. Avro supports recursive types? Can we test this?
+ // 4. Test queries with not all columns projected.
+
+ @Test
+ public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+ final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+ final String sql =
+ "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " +
+ "from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimplePrimitiveSchema_StarQuery() throws Exception {
+
+ final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+ final String sql = "select * from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
+
+ final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+ final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleArraySchema_NoNullValues() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+ final String sql = "select a_string, c_string_array[0], e_float_array[2] " +
+ "from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleArraySchema_StarQuery() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+ final String sql = "select * from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleNestedSchema_NoNullValues() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+ final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'] " +
+ "from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleNestedSchema_StarQuery() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+ final String sql = "select * from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testDoubleNestedSchema_NoNullValues() throws Exception {
+
+ final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+ final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'], " +
+ "c_record['nested_1_record']['double_nested_1_string'], " +
+ "c_record['nested_1_record']['double_nested_1_int'] " +
+ "from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testDoubleNestedSchema_StarQuery() throws Exception {
+
+ final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+ final String sql = "select * from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleEnumSchema_NoNullValues() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+ final String sql = "select a_string, b_enum from dfs_test.`" + file + "`";
+ test(sql);
+ }
+
+ @Test
+ public void testSimpleEnumSchema_StarQuery() throws Exception {
+
+ final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+ final String sql = "select * from dfs_test.`" + file + "`";
+ test(sql);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
new file mode 100644
index 0000000..419c054
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for generating Avro test data.
+ */
+public class AvroTestUtil {
+
+ public static final int RECORD_COUNT = 10;
+
+ public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+ final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_long").type().longType().noDefault()
+ .name("d_float").type().floatType().noDefault()
+ .name("e_double").type().doubleType().noDefault()
+ .name("f_bytes").type().bytesType().noDefault()
+ .name("g_null").type().nullType().noDefault()
+ .name("h_boolean").type().booleanType().noDefault()
+ .endRecord();
+
+ final File file = File.createTempFile("avro-primitive-test", ".avro");
+ file.deleteOnExit();
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+ try {
+ writer.create(schema, file);
+
+ ByteBuffer bb = ByteBuffer.allocate(1);
+ bb.put(0, (byte) 1);
+
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ final GenericRecord record = new GenericData.Record(schema);
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+ record.put("c_long", (long) i);
+ record.put("d_float", (float) i);
+ record.put("e_double", (double) i);
+ record.put("f_bytes", bb);
+ record.put("g_null", null);
+ record.put("h_boolean", (i % 2 == 0));
+ writer.append(record);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file.getAbsolutePath();
+ }
+
+ public static String generateSimpleEnumSchema_NoNullValues() throws Exception {
+
+ final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
+
+ final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
+ .endRecord();
+
+ final File file = File.createTempFile("avro-primitive-test", ".avro");
+ file.deleteOnExit();
+
+ final Schema enumSchema = schema.getField("b_enum").schema();
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+
+ try {
+ writer.create(schema, file);
+
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ final GenericRecord record = new GenericData.Record(schema);
+ record.put("a_string", "a_" + i);
+ final GenericData.EnumSymbol symbol =
+ new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
+ record.put("b_enum", symbol);
+ writer.append(record);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file.getAbsolutePath();
+ }
+
+ public static String generateSimpleArraySchema_NoNullValues() throws Exception {
+
+ final File file = File.createTempFile("avro-array-test", ".avro");
+ file.deleteOnExit();
+
+ final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_string_array").type().array().items().stringType().noDefault()
+ .name("d_int_array").type().array().items().intType().noDefault()
+ .name("e_float_array").type().array().items().floatType().noDefault()
+ .endRecord();
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+ try {
+ writer.create(schema, file);
+
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ final GenericRecord record = new GenericData.Record(schema);
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ GenericArray array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("c_string_array").schema());
+ for (int j = 0; j < RECORD_COUNT; j++) {
+ array.add(j, "c_string_array_" + i + "_" + j);
+ }
+ record.put("c_string_array", array);
+
+ array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("d_int_array").schema());
+ for (int j = 0; j < RECORD_COUNT; j++) {
+ array.add(j, i * j);
+ }
+ record.put("d_int_array", array);
+
+ array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("e_float_array").schema());
+ for (int j = 0; j < RECORD_COUNT; j++) {
+ array.add(j, (float) (i * j));
+ }
+ record.put("e_float_array", array);
+
+ writer.append(record);
+ }
+
+ } finally {
+ writer.close();
+ }
+ return file.getAbsolutePath();
+ }
+
+ public static String generateSimpleNestedSchema_NoNullValues() throws Exception {
+
+ final File file = File.createTempFile("avro-nested-test", ".avro");
+ file.deleteOnExit();
+
+ final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_record").type().record("my_record_1")
+ .namespace("foo.blah.org")
+ .fields()
+ .name("nested_1_string").type().stringType().noDefault()
+ .name("nested_1_int").type().intType().noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ final Schema nestedSchema = schema.getField("c_record").schema();
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+ writer.create(schema, file);
+
+ try {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ final GenericRecord record = new GenericData.Record(schema);
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+
+ record.put("c_record", nestedRecord);
+ writer.append(record);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file.getAbsolutePath();
+ }
+
+ public static String generateDoubleNestedSchema_NoNullValues() throws Exception {
+
+ final File file = File.createTempFile("avro-double-nested-test", ".avro");
+ file.deleteOnExit();
+
+ final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_record").type().record("my_record_1")
+ .namespace("foo.blah.org")
+ .fields()
+ .name("nested_1_string").type().stringType().noDefault()
+ .name("nested_1_int").type().intType().noDefault()
+ .name("nested_1_record").type().record("my_double_nested_record_1")
+ .namespace("foo.blah.org.rot")
+ .fields()
+ .name("double_nested_1_string").type().stringType().noDefault()
+ .name("double_nested_1_int").type().intType().noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ final Schema nestedSchema = schema.getField("c_record").schema();
+ final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+ writer.create(schema, file);
+
+ try {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ final GenericRecord record = new GenericData.Record(schema);
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+
+ final GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
+ doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
+ doubleNestedRecord.put("double_nested_1_int", i * i * i);
+
+ nestedRecord.put("nested_1_record", doubleNestedRecord);
+ record.put("c_record", nestedRecord);
+
+ writer.append(record);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file.getAbsolutePath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index e9772cf..d4d81f6 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -38,6 +38,9 @@
"txt" : {
type : "text",
extensions: [ "txt" ]
+ },
+ "avro" : {
+ type: "avro"
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 4b4e558..9a095aa 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -474,6 +474,10 @@ public final class UserBitShared {
* <code>NESTED_LOOP_JOIN = 35;</code>
*/
NESTED_LOOP_JOIN(35, 35),
+ /**
+ * <code>AVRO_SUB_SCAN = 36;</code>
+ */
+ AVRO_SUB_SCAN(36, 36),
;
/**
@@ -620,6 +624,10 @@ public final class UserBitShared {
* <code>NESTED_LOOP_JOIN = 35;</code>
*/
public static final int NESTED_LOOP_JOIN_VALUE = 35;
+ /**
+ * <code>AVRO_SUB_SCAN = 36;</code>
+ */
+ public static final int AVRO_SUB_SCAN_VALUE = 36;
public final int getNumber() { return value; }
@@ -662,6 +670,7 @@ public final class UserBitShared {
case 33: return HBASE_SUB_SCAN;
case 34: return WINDOW;
case 35: return NESTED_LOOP_JOIN;
+ case 36: return AVRO_SUB_SCAN;
default: return null;
}
}
@@ -19856,7 +19865,7 @@ public final class UserBitShared {
"yType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020" +
"\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
"ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
- "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\312\005\n\020CoreO" +
+ "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\335\005\n\020CoreO" +
"peratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADC" +
"AST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGA" +
"TE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025H",
@@ -19874,8 +19883,9 @@ public final class UserBitShared {
"AN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_S" +
"UB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUC" +
"ER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIN" +
- "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#B.\n\033org.apach" +
- "e.drill.exec.protoB\rUserBitSharedH\001"
+ "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_" +
+ "SCAN\020$B.\n\033org.apache.drill.exec.protoB\rU" +
+ "serBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index a37209d..b21d3ae 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -57,7 +57,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
PRODUCER_CONSUMER(32),
HBASE_SUB_SCAN(33),
WINDOW(34),
- NESTED_LOOP_JOIN(35);
+ NESTED_LOOP_JOIN(35),
+ AVRO_SUB_SCAN(36);
public final int number;
@@ -111,6 +112,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
case 33: return HBASE_SUB_SCAN;
case 34: return WINDOW;
case 35: return NESTED_LOOP_JOIN;
+ case 36: return AVRO_SUB_SCAN;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 0f86958..10c2790 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -243,4 +243,5 @@ enum CoreOperatorType {
HBASE_SUB_SCAN = 33;
WINDOW = 34;
NESTED_LOOP_JOIN = 35;
+ AVRO_SUB_SCAN = 36;
}
[3/4] drill git commit: DRILL-1512: Refactor AvroFormatPlugin
Posted by sm...@apache.org.
DRILL-1512: Refactor AvroFormatPlugin
Extend EasyFormatPlugin and remove AvroGroupScan and AvroSubScan
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bf3db318
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bf3db318
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bf3db318
Branch: refs/heads/master
Commit: bf3db3186ce633ac4a494bad888ae54542175ca4
Parents: 55a9a59
Author: Steven Phillips <sm...@apache.org>
Authored: Wed Apr 15 03:26:04 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:39:46 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/avro/AvroFormatPlugin.java | 92 ++------
.../drill/exec/store/avro/AvroGroupScan.java | 208 -------------------
.../drill/exec/store/avro/AvroRecordReader.java | 4 +-
.../exec/store/avro/AvroScanBatchCreator.java | 52 -----
.../drill/exec/store/avro/AvroSubScan.java | 142 -------------
5 files changed, 24 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 4fe1f71..2f487d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -18,111 +18,61 @@
package org.apache.drill.exec.store.avro;
import com.google.common.collect.Lists;
-import com.google.common.collect.ImmutableSet;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.AbstractWriter;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
/**
* Format plugin for Avro data files.
*/
-public class AvroFormatPlugin implements FormatPlugin {
-
- private final String name;
- private final DrillbitContext context;
- private final DrillFileSystem fs;
- private final StoragePluginConfig storagePluginConfig;
- private final AvroFormatConfig formatConfig;
- private final BasicFormatMatcher matcher;
+public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
StoragePluginConfig storagePluginConfig) {
this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
}
- public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
- StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) {
- this.name = name;
- this.context = context;
- this.fs = fs;
- this.storagePluginConfig = storagePluginConfig;
- this.formatConfig = formatConfig;
-
- // XXX - What does 'compressible' mean in this context?
- this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
+ public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+ super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
}
@Override
- public boolean supportsRead() {
+ public boolean supportsPushDown() {
return true;
}
@Override
- public boolean supportsWrite() {
- return false;
- }
-
- @Override
- public FormatMatcher getMatcher() {
- return matcher;
- }
-
- @Override
- public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
- throw new UnsupportedOperationException("Unimplemented");
- }
-
- @Override
- public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
- return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
- }
-
- @Override
- public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
- return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
+ return new AvroRecordReader(context, fileWork.getPath(), dfs, columns);
}
@Override
- public Set<StoragePluginOptimizerRule> getOptimizerRules() {
- return ImmutableSet.of();
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
}
@Override
- public AvroFormatConfig getConfig() {
- return formatConfig;
+ public int getReaderOperatorType() {
+ return CoreOperatorType.AVRO_SUB_SCAN_VALUE;
}
@Override
- public StoragePluginConfig getStorageConfig() {
- return storagePluginConfig;
+ public int getWriterOperatorType() {
+ throw new UnsupportedOperationException("unimplemented");
}
- @Override
- public DrillFileSystem getFileSystem() {
- return fs;
- }
- @Override
- public DrillbitContext getContext() {
- return context;
- }
-
- @Override
- public String getName() {
- return name;
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
deleted file mode 100644
index fcc1f94..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
-//import org.apache.drill.exec.store.avro.AvroSubScan.AvroSubScanSpec;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Group scan implementation for Avro data files.
- */
-@JsonTypeName("avro-scan")
-public class AvroGroupScan extends AbstractGroupScan {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroGroupScan.class);
-
- private final AvroFormatPlugin formatPlugin;
- private final AvroFormatConfig formatConfig;
- private final List<SchemaPath> columns;
- private final FileSystem fs;
- private final List<ReadEntryWithPath> entries;
- private final String selectionRoot;
-
- //private Map<Integer, List<AvroSubScanSpec>> endpointMappings;
-
- private List<EndpointAffinity> endpointAffinities;
-
- @JsonCreator
- public AvroGroupScan(@JsonProperty("entries") final List<ReadEntryWithPath> entries,
- @JsonProperty("storage") final StoragePluginConfig storageConfig,
- @JsonProperty("format") final FormatPluginConfig formatConfig,
- @JacksonInject final StoragePluginRegistry engineRegistry,
- @JsonProperty("columns") final List<SchemaPath> columns,
- @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
-
- this.columns = columns;
- final AvroFormatConfig afc;
- if (formatConfig == null) {
- afc = new AvroFormatConfig();
- } else {
- afc = (AvroFormatConfig) formatConfig;
- }
- Preconditions.checkNotNull(storageConfig);
- Preconditions.checkNotNull(afc);
- this.formatPlugin = (AvroFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, afc);
- Preconditions.checkNotNull(this.formatPlugin);
- this.fs = formatPlugin.getFileSystem().getUnderlying();
- this.formatConfig = formatPlugin.getConfig();
- this.entries = entries;
- this.selectionRoot = selectionRoot;
- }
-
- public AvroGroupScan(final List<FileStatus> files, final AvroFormatPlugin formatPlugin,
- final String selectionRoot, final List<SchemaPath> columns) throws IOException {
-
- this.formatPlugin = formatPlugin;
- this.columns = columns;
- this.formatConfig = formatPlugin.getConfig();
- this.fs = formatPlugin.getFileSystem().getUnderlying();
- this.selectionRoot = selectionRoot;
-
- this.entries = Lists.newArrayList();
- for (final FileStatus fs : files) {
- entries.add(new ReadEntryWithPath(fs.getPath().toString()));
- }
- }
-
- @JsonProperty("format")
- public AvroFormatConfig getFormatConfig() {
- return this.formatConfig;
- }
-
- @JsonProperty("storage")
- public StoragePluginConfig getEngineConfig() {
- return this.formatPlugin.getStorageConfig();
- }
-
- private AvroGroupScan(final AvroGroupScan that, final List<SchemaPath> columns) {
- this.columns = (columns == null) ? that.columns : columns;
- this.entries = that.entries;
- this.formatConfig = that.formatConfig;
- this.formatPlugin = that.formatPlugin;
- this.fs = that.fs;
- this.selectionRoot = that.selectionRoot;
-
- // XXX - DON'T FORGET TO ADD THESE AFTER WE'VE IMPLEMENTED AFFINITY
- //this.endpointAffinities = that.endpointAffinities;
- //this.mappings = that.mappings;
- //this.rowCount = that.rowCount;
- //this.rowGroupInfos = that.rowGroupInfos;
- //this.columnValueCounts = that.columnValueCounts;
- }
-
- @Override
- public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
- // XXX - Unimplemented
- logger.warn("AvroGroupScan.applyAssignments() is not implemented");
- }
-
- @Override
- public AvroSubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
-
- final AvroSubScan sub = new AvroSubScan(formatPlugin, columns, selectionRoot);
-
- // XXX - This is a temporary hack just to get something working. Need to revisit sub-scan specs
- // once we work out affinity and endpoints.
- sub.setEntry(entries.get(0));
- sub.setFileSystem(fs);
-
- return sub;
- }
-
- @Override
- public int getMaxParallelizationWidth() {
- // XXX - Finish
- return 1;
- }
-
- @Override
- public ScanStats getScanStats() {
- // XXX - Is 0 the correct value for second arg? What if I don't know the row count a priori?
- return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 0, 1, 1);
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- // XXX - Unimplemented
- if (endpointAffinities != null) {
- return endpointAffinities;
- }
- return Collections.emptyList();
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- return new AvroGroupScan(this, null);
- }
-
- @Override
- public String getDigest() {
- return toString();
- }
-
- @Override
- public String toString() {
- return "AvroGroupScan [entries=" + entries +
- ", selectionRoot=" + selectionRoot +
- ", columns=" + columns + "]";
- }
-
- @Override
- public GroupScan clone(final List<SchemaPath> columns) {
- return new AvroGroupScan(this, columns);
- }
-
- @JsonIgnore
- public boolean canPushdownProjects(final List<SchemaPath> columns) {
- return true;
- }
-
- public List<ReadEntryWithPath> getEntries() {
- return entries;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 3b7697d..489a989 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -73,6 +73,7 @@ public class AvroRecordReader extends AbstractRecordReader {
private DataFileReader<GenericContainer> reader = null;
private OperatorContext operatorContext;
+ private FileSystem fs;
private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -91,6 +92,7 @@ public class AvroRecordReader extends AbstractRecordReader {
hadoop = new Path(inputPath);
buffer = fragmentContext.getManagedBuffer();
+ this.fs = fileSystem;
setColumns(projectedColumns);
}
@@ -101,7 +103,7 @@ public class AvroRecordReader extends AbstractRecordReader {
writer = new VectorContainerWriter(output);
try {
- reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>());
+ reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
} catch (IOException e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
deleted file mode 100644
index 42c8e99..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import java.util.List;
-
-/**
- * Batch creator for Avro scans.
- */
-public class AvroScanBatchCreator implements BatchCreator<AvroSubScan> {
-
-
- @Override
- public RecordBatch getBatch(final FragmentContext context, final AvroSubScan subScan,
- final List<RecordBatch> children) throws ExecutionSetupException {
-
- Preconditions.checkArgument(children.isEmpty());
- List<SchemaPath> columns = subScan.getColumns();
- List<RecordReader> readers = Lists.newArrayList();
-
- readers.add(new AvroRecordReader(context, subScan.getEntry().getPath(), subScan.getFileSystem(), columns));
-
- return new ScanBatch(subScan, context, readers.iterator());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
deleted file mode 100644
index 0a579aa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-
-import com.google.common.collect.Iterators;
-import com.google.common.base.Preconditions;
-import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Contains information for reading a single Avro row group from HDFS.
- */
-@JsonTypeName("avro-sub-scan")
-public class AvroSubScan extends AbstractBase implements SubScan {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroSubScan.class);
-
- private final AvroFormatPlugin formatPlugin;
- private final AvroFormatConfig formatConfig;
- private final List<SchemaPath> columns;
- private final String selectionRoot;
-
- private ReadEntryWithPath entry;
- private FileSystem fs;
-
- @JsonCreator
- public AvroSubScan(@JacksonInject final StoragePluginRegistry registry,
- @JsonProperty("storage") final StoragePluginConfig storageConfig,
- @JsonProperty("format") final FormatPluginConfig formatConfig,
- @JsonProperty("columns") final List<SchemaPath> columns,
- @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
- this((AvroFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
- formatConfig == null ? new AvroFormatConfig() : formatConfig), columns, selectionRoot);
- }
-
- public AvroSubScan(final AvroFormatPlugin formatPlugin, final List<SchemaPath> columns,
- final String selectionRoot) {
- this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
- this.formatConfig = formatPlugin.getConfig();
- this.columns = columns;
- this.selectionRoot = selectionRoot;
- }
-
- @JsonProperty("storage")
- public StoragePluginConfig getEngineConfig() {
- return formatPlugin.getStorageConfig();
- }
-
- @JsonProperty("format")
- public AvroFormatConfig getFormatConfig() {
- return formatConfig;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(final PhysicalVisitor<T, X, E> physicalVisitor, final X value) throws E {
- return physicalVisitor.visitSubScan(this, value);
- }
-
- @Override
- public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- return new AvroSubScan(formatPlugin, columns, selectionRoot);
- }
-
- @Override
- public int getOperatorType() {
- return UserBitShared.CoreOperatorType.AVRO_ROW_GROUP_SCAN_VALUE;
- }
-
- @Override
- public Iterator<PhysicalOperator> iterator() {
- return Iterators.emptyIterator();
- }
-
- @JsonIgnore
- public List<SchemaPath> getColumns() {
- return columns;
- }
-
- /*
- public static class AvroSubScanSpec {
-
- }
- */
-
- /** XXX - temp hacks **/
-
- @JsonIgnore
- public void setEntry(ReadEntryWithPath entry) {
- this.entry = entry;
- }
-
- @JsonIgnore
- public ReadEntryWithPath getEntry() {
- return entry;
- }
-
- @JsonIgnore
- public void setFileSystem(FileSystem fs) {
- this.fs = fs;
- }
-
- @JsonIgnore
- public FileSystem getFileSystem() {
- return fs;
- }
-}