You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 22:34:22 UTC

[1/4] drill git commit: DRILL-1899: Consider read preference set by users

Repository: drill
Updated Branches:
  refs/heads/master 07cca5dba -> 033d0dfd6


DRILL-1899: Consider read preference set by users


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0f9887d9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0f9887d9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0f9887d9

Branch: refs/heads/master
Commit: 0f9887d91ee92f0c8f3197c55b95b694e7dcd9f4
Parents: 07cca5d
Author: B Anil Kumar <ak...@gmail.com>
Authored: Wed Apr 15 02:10:17 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:36:29 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/mongo/MongoGroupScan.java  | 69 +++++++++++++++-----
 .../exec/store/mongo/MongoRecordReader.java     |  1 -
 .../store/mongo/schema/MongoSchemaFactory.java  |  1 -
 3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index f6b449b..b086786 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -166,7 +166,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
     return databaseNames.contains(CONFIG);
   }
 
-  @SuppressWarnings("rawtypes")
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   private void init() throws IOException {
     MongoClient client = null;
     try {
@@ -178,9 +178,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
       chunksInverseMapping = Maps.newLinkedHashMap();
       if (isShardedCluster(client)) {
         DB db = client.getDB(CONFIG);
-        db.setReadPreference(ReadPreference.nearest());
         DBCollection chunksCollection = db.getCollectionFromString(CHUNKS);
-
         DBObject query = new BasicDBObject(1);
         query
             .put(
@@ -212,23 +210,29 @@ public class MongoGroupScan extends AbstractGroupScan implements
             String[] tagAndHost = StringUtils.split(hostEntry, '/');
             String[] hosts = tagAndHost.length > 1 ? StringUtils.split(
                 tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ',');
-            Set<ServerAddress> addressList = chunksMapping.get(chunkId);
+            List<String> chunkHosts = Arrays.asList(hosts);
+            //to get the address list from one of the shard nodes, need to get port.
+            MongoClient shardClient = new MongoClient(hosts[0]);
+            Set<ServerAddress> addressList = getPreferredHosts(shardClient, chunkHosts);
             if (addressList == null) {
               addressList = Sets.newHashSet();
-              chunksMapping.put(chunkId, addressList);
-            }
-            for (String host : hosts) {
-              addressList.add(new ServerAddress(host));
+              for (String host : chunkHosts) {
+                addressList.add(new ServerAddress(host));
+              }
             }
+            chunksMapping.put(chunkId, addressList);
             ServerAddress address = addressList.iterator().next();
-
             List<ChunkInfo> chunkList = chunksInverseMapping.get(address
                 .getHost());
             if (chunkList == null) {
               chunkList = Lists.newArrayList();
               chunksInverseMapping.put(address.getHost(), chunkList);
             }
-            ChunkInfo chunkInfo = new ChunkInfo(Arrays.asList(hosts), chunkId);
+            List<String> chunkHostsList = new ArrayList<String>();
+            for(ServerAddress serverAddr : addressList){
+              chunkHostsList.add(serverAddr.toString());
+            }
+            ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId);
             DBObject minObj = (BasicDBObject) chunkObj.get(MIN);
 
             Map<String, Object> minFilters = Maps.newHashMap();
@@ -261,10 +265,12 @@ public class MongoGroupScan extends AbstractGroupScan implements
         String chunkName = scanSpec.getDbName() + "."
             + scanSpec.getCollectionName();
         List<String> hosts = clientURI.getHosts();
-        Set<ServerAddress> addressList = Sets.newHashSet();
-
-        for (String host : hosts) {
-          addressList.add(new ServerAddress(host));
+        Set<ServerAddress> addressList = getPreferredHosts(client, hosts);
+        if (addressList == null) {
+          addressList = Sets.newHashSet();
+          for (String host : hosts) {
+            addressList.add(new ServerAddress(host));
+          }
         }
         chunksMapping.put(chunkName, addressList);
 
@@ -287,6 +293,40 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   }
 
+  @SuppressWarnings("unchecked")
+  private Set<ServerAddress> getPreferredHosts(MongoClient client,
+      List<String> hosts) throws UnknownHostException {
+    Set<ServerAddress> addressList = Sets.newHashSet();
+    DB db = client.getDB(scanSpec.getDbName());
+    ReadPreference readPreference = client.getReadPreference();
+    switch (readPreference.getName().toUpperCase()) {
+    case "PRIMARY":
+    case "PRIMARYPREFERRED":
+      String primaryHost = db.command("isMaster").getString("primary");
+      addressList.add(new ServerAddress(primaryHost));
+      return addressList;
+    case "SECONDARY":
+    case "SECONDARYPREFERRED":
+      primaryHost = db.command("isMaster").getString("primary");
+      @SuppressWarnings("unchecked")
+      List<String> hostsList = (List<String>) db.command("isMaster").get(
+          "hosts");
+      hostsList.remove(primaryHost);
+      for (String host : hostsList) {
+        addressList.add(new ServerAddress(host));
+      }
+      return addressList;
+    case "NEAREST":
+      hostsList = (List<String>) db.command("isMaster").get("hosts");
+      for (String host : hostsList) {
+        addressList.add(new ServerAddress(host));
+      }
+      return addressList;
+    default:
+      return null;
+    }
+  }
+
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
     MongoGroupScan clone = new MongoGroupScan(this);
@@ -433,7 +473,6 @@ public class MongoGroupScan extends AbstractGroupScan implements
       MongoClient client = MongoCnxnManager.getClient(addresses,
           clientURI.getOptions());
       DB db = client.getDB(scanSpec.getDbName());
-      db.setReadPreference(ReadPreference.nearest());
       DBCollection collection = db.getCollectionFromString(scanSpec
           .getCollectionName());
       CommandResult stats = collection.getStats();

http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 6eff72f..0b263df 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -136,7 +136,6 @@ public class MongoRecordReader extends AbstractRecordReader {
       }
       MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions);
       DB db = client.getDB(subScanSpec.getDbName());
-      db.setReadPreference(ReadPreference.nearest());
       collection = db.getCollection(subScanSpec.getCollectionName());
     } catch (UnknownHostException e) {
       throw new DrillRuntimeException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/0f9887d9/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index a227c9a..3c70638 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -114,7 +114,6 @@ public class MongoSchemaFactory implements SchemaFactory {
     @Override
     public List<String> load(String dbName) throws Exception {
       DB db = MongoCnxnManager.getClient(addresses, options).getDB(dbName);
-      db.setReadPreference(ReadPreference.nearest());
       Set<String> collectionNames = db.getCollectionNames();
       return new ArrayList<>(collectionNames);
     }


[4/4] drill git commit: DRILL-2658: Add ilike and substring functions

Posted by sm...@apache.org.
DRILL-2658: Add ilike and substring functions


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/033d0dfd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/033d0dfd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/033d0dfd

Branch: refs/heads/master
Commit: 033d0dfd6b7a5fa0eeaf4b337955b3870c2d680a
Parents: bf3db31
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Mar 23 18:17:26 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:44:34 2015 -0700

----------------------------------------------------------------------
 .../exec/expr/fn/impl/StringFunctions.java      | 103 +++++++++++++++++++
 .../exec/expr/fn/impl/TestStringFunctions.java  |  59 +++++++++++
 2 files changed, 162 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/033d0dfd/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
index a47bf87..49f581f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
@@ -90,6 +91,50 @@ public class StringFunctions{
     }
   }
 
+  @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+  public static class ILike implements DrillSimpleFunc{
+
+    @Param VarCharHolder input;
+    @Param(constant=true) VarCharHolder pattern;
+    @Output BitHolder out;
+    @Workspace java.util.regex.Matcher matcher;
+
+    public void setup() {
+      matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( //
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start,  pattern.end,  pattern.buffer)),
+          java.util.regex.Pattern.CASE_INSENSITIVE).matcher("");
+    }
+
+    public void eval() {
+      String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+      matcher.reset(i);
+      out.value = matcher.matches()? 1:0;
+    }
+  }
+
+  @FunctionTemplate(name = "ilike", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+  public static class ILikeWithEscape implements DrillSimpleFunc{
+
+    @Param VarCharHolder input;
+    @Param(constant=true) VarCharHolder pattern;
+    @Param(constant=true) VarCharHolder escape;
+    @Output BitHolder out;
+    @Workspace java.util.regex.Matcher matcher;
+
+    public void setup() {
+      matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( //
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start,  pattern.end,  pattern.buffer),
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(escape.start,  escape.end,  escape.buffer)),
+          java.util.regex.Pattern.CASE_INSENSITIVE).matcher("");
+    }
+
+    public void eval() {
+      String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+      matcher.reset(i);
+      out.value = matcher.matches()? 1:0;
+    }
+  }
+
   @FunctionTemplate(names = {"similar", "similar_to"}, scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
   public static class Similar implements DrillSimpleFunc{
     @Param VarCharHolder input;
@@ -409,6 +454,64 @@ public class StringFunctions{
 
   }
 
+  @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
+  public static class SubstringRegex implements DrillSimpleFunc{
+
+    @Param VarCharHolder input;
+    @Param(constant=true) VarCharHolder pattern;
+    @Output NullableVarCharHolder out;
+    @Workspace java.util.regex.Matcher matcher;
+
+    public void setup() {
+      matcher = java.util.regex.Pattern.compile(
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start,  pattern.end,  pattern.buffer))
+          .matcher("");
+    }
+
+    public void eval() {
+      String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+      matcher.reset(i);
+      if (matcher.find()) {
+        out.isSet = 1;
+        out.buffer = input.buffer;
+        out.start = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.start());
+        out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.end());
+      }
+    }
+  }
+
+  @FunctionTemplate(names = {"substring", "substr" }, scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
+  public static class SubstringRegexNullable implements DrillSimpleFunc{
+
+    @Param NullableVarCharHolder input;
+    @Param(constant=true) VarCharHolder pattern;
+    @Output NullableVarCharHolder out;
+    @Workspace java.util.regex.Matcher matcher;
+
+    public void setup() {
+      matcher = java.util.regex.Pattern.compile(
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start,  pattern.end,  pattern.buffer))
+          .matcher("");
+    }
+
+    public void eval() {
+      if (input.isSet == 0) {
+        out.isSet = 0;
+      } else {
+        String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);
+        matcher.reset(i);
+        if (matcher.find()) {
+          out.isSet = 1;
+          out.buffer = input.buffer;
+          out.start = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.start());
+          out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.getUTF8CharPosition(input.buffer, input.start, input.end, matcher.end());
+        } else {
+          out.isSet = 0;
+        }
+      }
+    }
+  }
+
   // Return first length characters in the string. When length is negative, return all but last |length| characters.
   // If length > total charcounts, return the whole string.
   // If length = 0, return empty

http://git-wip-us.apache.org/repos/asf/drill/blob/033d0dfd/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
new file mode 100644
index 0000000..e1ef7c9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestStringFunctions.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestStringFunctions extends BaseTestQuery {
+
+  @Test
+  public void testILike() throws Exception {
+    testBuilder()
+        .sqlQuery("select n_name from cp.`tpch/nation.parquet` where ilike(n_name, '%united%') = true")
+        .unOrdered()
+        .baselineColumns("n_name")
+        .baselineValues("UNITED STATES")
+        .baselineValues("UNITED KINGDOM")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testILikeEscape() throws Exception {
+    testBuilder()
+        .sqlQuery("select a from (select concat(r_name , '_region') a from cp.`tpch/region.parquet`) where ilike(a, 'asia#_region', '#') = true")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues("ASIA_region")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testSubstr() throws Exception {
+    testBuilder()
+        .sqlQuery("select substr(n_name, 'UN.TE.') a from cp.`tpch/nation.parquet` where ilike(n_name, 'united%') = true")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues("UNITED")
+        .baselineValues("UNITED")
+        .build()
+        .run();
+  }
+}


[2/4] drill git commit: DRILL-1512: Avro record reader

Posted by sm...@apache.org.
DRILL-1512: Avro record reader

Reader for Avro data files.

Supports:
- All primitive types
- Arrays
- Nested records
- Enums

Unimplemented:
- Endpoint affinity
- Recursive data types
- Complex types: Maps, Fixed, Unions


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55a9a59d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55a9a59d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55a9a59d

Branch: refs/heads/master
Commit: 55a9a59d409ee93e42fcdbdc12afd8fb0caffe3f
Parents: 0f9887d
Author: Andrew Selden <an...@elasticsearch.com>
Authored: Fri Oct 10 16:41:59 2014 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:39:46 2015 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  10 +
 .../drill/exec/store/avro/AvroFormatConfig.java |  39 +++
 .../drill/exec/store/avro/AvroFormatPlugin.java | 128 ++++++++
 .../drill/exec/store/avro/AvroGroupScan.java    | 208 ++++++++++++
 .../drill/exec/store/avro/AvroRecordReader.java | 324 +++++++++++++++++++
 .../exec/store/avro/AvroScanBatchCreator.java   |  52 +++
 .../drill/exec/store/avro/AvroSubScan.java      | 142 ++++++++
 .../drill/exec/store/avro/AvroTypeHelper.java   | 184 +++++++++++
 .../drill/exec/store/avro/MapOrListWriter.java  | 105 ++++++
 .../resources/bootstrap-storage-plugins.json    |   6 +
 .../drill/exec/store/avro/AvroFormatTest.java   | 129 ++++++++
 .../drill/exec/store/avro/AvroTestUtil.java     | 270 ++++++++++++++++
 .../resources/bootstrap-storage-plugins.json    |   3 +
 .../apache/drill/exec/proto/UserBitShared.java  |  16 +-
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 16 files changed, 1617 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index fdd24ef..f5313ca 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -353,6 +353,16 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>1.7.7</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
new file mode 100644
index 0000000..4a2cfb9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+/**
+ * Format plugin config for Avro data files.
+ */
+@JsonTypeName("avro")
+public class AvroFormatConfig implements FormatPluginConfig {
+
+  @Override
+  public int hashCode() {
+    return 101; // XXX - WHAT IS THIS SUPPOSED TO BE?
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof AvroFormatConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
new file mode 100644
index 0000000..4fe1f71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Format plugin for Avro data files.
+ */
+public class AvroFormatPlugin implements FormatPlugin {
+
+  private final String name;
+  private final DrillbitContext context;
+  private final DrillFileSystem fs;
+  private final StoragePluginConfig storagePluginConfig;
+  private final AvroFormatConfig formatConfig;
+  private final BasicFormatMatcher matcher;
+
+  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+                          StoragePluginConfig storagePluginConfig) {
+    this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+  }
+
+  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+                          StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) {
+    this.name = name;
+    this.context = context;
+    this.fs = fs;
+    this.storagePluginConfig = storagePluginConfig;
+    this.formatConfig = formatConfig;
+
+    // XXX - What does 'compressible' mean in this context?
+    this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
+    throw new UnsupportedOperationException("Unimplemented");
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
+    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
+    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public AvroFormatConfig getConfig() {
+    return formatConfig;
+  }
+
+  @Override
+  public StoragePluginConfig getStorageConfig() {
+    return storagePluginConfig;
+  }
+
+  @Override
+  public DrillFileSystem getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
new file mode 100644
index 0000000..fcc1f94
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+//import org.apache.drill.exec.store.avro.AvroSubScan.AvroSubScanSpec;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Group scan implementation for Avro data files.
+ */
+@JsonTypeName("avro-scan")
+public class AvroGroupScan extends AbstractGroupScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroGroupScan.class);
+
+  private final AvroFormatPlugin formatPlugin;
+  private final AvroFormatConfig formatConfig;
+  private final List<SchemaPath> columns;
+  private final FileSystem fs;
+  private final List<ReadEntryWithPath> entries;
+  private final String selectionRoot;
+
+  //private Map<Integer, List<AvroSubScanSpec>> endpointMappings;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  @JsonCreator
+  public AvroGroupScan(@JsonProperty("entries") final List<ReadEntryWithPath> entries,
+                       @JsonProperty("storage") final StoragePluginConfig storageConfig,
+                       @JsonProperty("format") final FormatPluginConfig formatConfig,
+                       @JacksonInject final StoragePluginRegistry engineRegistry,
+                       @JsonProperty("columns") final List<SchemaPath> columns,
+                       @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+
+    this.columns = columns;
+    final AvroFormatConfig afc;
+    if (formatConfig == null) {
+      afc = new AvroFormatConfig();
+    } else {
+      afc = (AvroFormatConfig) formatConfig;
+    }
+    Preconditions.checkNotNull(storageConfig);
+    Preconditions.checkNotNull(afc);
+    this.formatPlugin = (AvroFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, afc);
+    Preconditions.checkNotNull(this.formatPlugin);
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.formatConfig = formatPlugin.getConfig();
+    this.entries = entries;
+    this.selectionRoot = selectionRoot;
+  }
+
+  public AvroGroupScan(final List<FileStatus> files, final AvroFormatPlugin formatPlugin,
+                       final String selectionRoot, final List<SchemaPath> columns) throws IOException {
+
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
+    this.formatConfig = formatPlugin.getConfig();
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.selectionRoot = selectionRoot;
+
+    this.entries = Lists.newArrayList();
+    for (final FileStatus fs : files) {
+      entries.add(new ReadEntryWithPath(fs.getPath().toString()));
+    }
+  }
+
+  @JsonProperty("format")
+  public AvroFormatConfig getFormatConfig() {
+    return this.formatConfig;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return this.formatPlugin.getStorageConfig();
+  }
+
+  private AvroGroupScan(final AvroGroupScan that, final List<SchemaPath> columns) {
+    this.columns = (columns == null) ? that.columns : columns;
+    this.entries = that.entries;
+    this.formatConfig = that.formatConfig;
+    this.formatPlugin = that.formatPlugin;
+    this.fs = that.fs;
+    this.selectionRoot = that.selectionRoot;
+
+    // XXX - DON'T FORGET TO ADD THESE AFTER WE'VE IMPLEMENTED AFFINITY
+    //this.endpointAffinities = that.endpointAffinities;
+    //this.mappings = that.mappings;
+    //this.rowCount = that.rowCount;
+    //this.rowGroupInfos = that.rowGroupInfos;
+    //this.columnValueCounts = that.columnValueCounts;
+  }
+
+  @Override
+  public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+    // XXX - Unimplemented
+    logger.warn("AvroGroupScan.applyAssignments() is not implemented");
+  }
+
+  @Override
+  public AvroSubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
+
+    final AvroSubScan sub = new AvroSubScan(formatPlugin, columns, selectionRoot);
+
+    // XXX - This is a temporary hack just to get something working. Need to revisit sub-scan specs
+    //       once we work out affinity and endpoints.
+    sub.setEntry(entries.get(0));
+    sub.setFileSystem(fs);
+
+    return sub;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    // XXX - Finish
+    return 1;
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    // XXX - Is 0 the correct value for second arg? What if I don't know the row count a priori?
+    return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 0, 1, 1);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    // XXX - Unimplemented
+    if (endpointAffinities != null) {
+      return endpointAffinities;
+    }
+    return Collections.emptyList();
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new AvroGroupScan(this, null);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public String toString() {
+    return "AvroGroupScan [entries=" + entries +
+            ", selectionRoot=" + selectionRoot +
+            ", columns=" + columns + "]";
+  }
+
+  @Override
+  public GroupScan clone(final List<SchemaPath> columns) {
+    return new AvroGroupScan(this, columns);
+  }
+
+  @JsonIgnore
+  public boolean canPushdownProjects(final List<SchemaPath> columns) {
+    return true;
+  }
+
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
new file mode 100644
index 0000000..3b7697d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Stopwatch;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A RecordReader implementation for Avro data files.
+ *
+ * @see RecordReader
+ */
+public class AvroRecordReader extends AbstractRecordReader {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
+
+  private final Path hadoop;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+
+  private DataFileReader<GenericContainer> reader = null;
+  private OperatorContext operatorContext;
+
+  private static final int DEFAULT_BATCH_SIZE = 1000;
+
+
+  public AvroRecordReader(final FragmentContext fragmentContext,
+                          final String inputPath,
+                          final FileSystem fileSystem,
+                          final List<SchemaPath> projectedColumns) {
+    this(fragmentContext, inputPath, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+  }
+
+  public AvroRecordReader(final FragmentContext fragmentContext,
+                          final String inputPath,
+                          final FileSystem fileSystem,
+                          List<SchemaPath> projectedColumns, final int defaultBatchSize) {
+
+    hadoop = new Path(inputPath);
+    buffer = fragmentContext.getManagedBuffer();
+
+    setColumns(projectedColumns);
+  }
+
+  @Override
+  public void setup(final OutputMutator output) throws ExecutionSetupException {
+
+    writer = new VectorContainerWriter(output);
+
+    try {
+      reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>());
+    } catch (IOException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public int next() {
+    final Stopwatch watch = new Stopwatch().start();
+
+    if (reader == null) {
+      throw new IllegalStateException("Avro reader is not open.");
+    }
+    if (!reader.hasNext()) {
+      return 0;
+    }
+
+    int recordCount = 0;
+    writer.allocate();
+    writer.reset();
+
+    try {
+
+      // XXX - Implement batch size
+
+      for (GenericContainer container = null; reader.hasNext(); recordCount++) {
+        writer.setPosition(recordCount);
+        container = reader.next(container);
+        processRecord(container, container.getSchema());
+      }
+
+      writer.setValueCount(recordCount);
+
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+
+    logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+    return recordCount;
+  }
+
+  private void processRecord(final GenericContainer container, final Schema schema) {
+
+    final Schema.Type type = schema.getType();
+
+    switch (type) {
+      case RECORD:
+        process(container, schema, null, new MapOrListWriter(writer.rootAsMap()));
+        break;
+      default:
+        throw new DrillRuntimeException("Root object must be record type. Found: " + type);
+    }
+  }
+
+  private void process(final Object value, final Schema schema, final String fieldName, final MapOrListWriter writer) {
+
+    writer.start();
+    final Schema.Type type = schema.getType();
+
+    switch (type) {
+      case RECORD:
+        for (final Schema.Field field : schema.getFields()) {
+
+          MapOrListWriter _writer = writer;
+          if (field.schema().getType() == Schema.Type.RECORD) {
+            _writer = writer.map(field.name());
+          }
+
+          process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer);
+        }
+        break;
+      case ARRAY:
+        assert fieldName != null;
+        final GenericArray array = (GenericArray) value;
+        for (final Object o : array) {
+          process(o, array.getSchema().getElementType(), fieldName, writer.list(fieldName));
+        }
+        break;
+      case FIXED:
+      case UNION:
+      case MAP:
+        throw new UnsupportedOperationException("Unimplemented type: " + type.toString());
+      case ENUM:  // Enum symbols are strings
+      case NULL:  // Treat null type as a primitive
+      default:
+        assert fieldName != null;
+
+        if (writer.isMapWriter()) {
+          SchemaPath path;
+          if (writer.map.getField().getPath().getRootSegment().getPath().equals("")) {
+            path = new SchemaPath(new PathSegment.NameSegment(fieldName));
+          } else {
+            path = writer.map.getField().getPath().getChild(fieldName);
+          }
+
+          if (!selected(path)) {
+            break;
+          }
+        }
+
+        processPrimitive(value, schema.getType(), fieldName, writer);
+        break;
+    }
+
+    writer.end();
+  }
+
+  private void processPrimitive(final Object value, final Schema.Type type, final String fieldName,
+                                final MapOrListWriter writer) {
+
+    switch (type) {
+      case STRING:
+        final Utf8 utf8 = (Utf8) value;
+        final int length = utf8.length();
+        final VarCharHolder vh = new VarCharHolder();
+        ensure(length);
+        buffer.setBytes(0, utf8.getBytes());
+        vh.buffer = buffer;
+        vh.start = 0;
+        vh.end = length;
+        writer.varChar(fieldName).write(vh);
+        break;
+      case INT:
+        final IntHolder ih = new IntHolder();
+        ih.value = (Integer) value;
+        writer.integer(fieldName).write(ih);
+        break;
+      case LONG:
+        final BigIntHolder bh = new BigIntHolder();
+        bh.value = (Long) value;
+        writer.bigInt(fieldName).write(bh);
+        break;
+      case FLOAT:
+        final Float4Holder fh = new Float4Holder();
+        fh.value = (Float) value;
+        writer.float4(fieldName).write(fh);
+        break;
+      case DOUBLE:
+        final Float8Holder f8h = new Float8Holder();
+        f8h.value = (Double) value;
+        writer.float8(fieldName).write(f8h);
+        break;
+      case BOOLEAN:
+        final BitHolder bit = new BitHolder();
+        bit.value = (Boolean) value ? 1 : 0;
+        writer.bit(fieldName).write(bit);
+        break;
+      case BYTES:
+        // XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields.
+        final VarBinaryHolder vb = new VarBinaryHolder();
+        final ByteBuffer buf = (ByteBuffer) value;
+        final byte[] bytes = buf.array();
+        ensure(bytes.length);
+        buffer.setBytes(0, bytes);
+        vb.buffer = buffer;
+        vb.start = 0;
+        vb.end = bytes.length;
+        writer.binary(fieldName).write(vb);
+        break;
+      case NULL:
+        // Nothing to do for null type
+        break;
+      case ENUM:
+        final String symbol = value.toString();
+        final byte[] b;
+        try {
+          b = symbol.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+          throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
+        }
+        final VarCharHolder vch = new VarCharHolder();
+        ensure(b.length);
+        buffer.setBytes(0, b);
+        vch.buffer = buffer;
+        vch.start = 0;
+        vch.end = b.length;
+        writer.varChar(fieldName).write(vch);
+        break;
+      default:
+        throw new DrillRuntimeException("Unhandled Avro type: " + type.toString());
+    }
+  }
+
+  private boolean selected(SchemaPath field) {
+    if (isStarQuery()) {
+      return true;
+    }
+    for (final SchemaPath sp : getColumns()) {
+      if (sp.contains(field)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void ensure(final int length) {
+    buffer = buffer.reallocIfNeeded(length);
+  }
+
+  @Override
+  public void cleanup() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        logger.warn("Error closing Avro reader", e);
+      } finally {
+        reader = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
new file mode 100644
index 0000000..42c8e99
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+/**
+ * Batch creator for Avro scans.
+ */
+public class AvroScanBatchCreator implements BatchCreator<AvroSubScan> {
+
+
+  @Override
+  public RecordBatch getBatch(final FragmentContext context, final AvroSubScan subScan,
+                              final List<RecordBatch> children) throws ExecutionSetupException {
+
+    Preconditions.checkArgument(children.isEmpty());
+    List<SchemaPath> columns = subScan.getColumns();
+    List<RecordReader> readers = Lists.newArrayList();
+
+    readers.add(new AvroRecordReader(context, subScan.getEntry().getPath(), subScan.getFileSystem(), columns));
+
+    return new ScanBatch(subScan, context, readers.iterator());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
new file mode 100644
index 0000000..0a579aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Contains information for reading a single Avro row group from HDFS.
+ */
+@JsonTypeName("avro-sub-scan")
+public class AvroSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroSubScan.class);
+
+  private final AvroFormatPlugin formatPlugin;
+  private final AvroFormatConfig formatConfig;
+  private final List<SchemaPath> columns;
+  private final String selectionRoot;
+
+  private ReadEntryWithPath entry;
+  private FileSystem fs;
+
+  @JsonCreator
+  public AvroSubScan(@JacksonInject final StoragePluginRegistry registry,
+                     @JsonProperty("storage") final StoragePluginConfig storageConfig,
+                     @JsonProperty("format") final FormatPluginConfig formatConfig,
+                     @JsonProperty("columns") final List<SchemaPath> columns,
+                     @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+    this((AvroFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+            formatConfig == null ? new AvroFormatConfig() : formatConfig), columns, selectionRoot);
+  }
+
+  public AvroSubScan(final AvroFormatPlugin formatPlugin, final List<SchemaPath> columns,
+                     final String selectionRoot) {
+    this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
+    this.formatConfig = formatPlugin.getConfig();
+    this.columns = columns;
+    this.selectionRoot = selectionRoot;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public AvroFormatConfig getFormatConfig() {
+    return formatConfig;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(final PhysicalVisitor<T, X, E> physicalVisitor, final X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new AvroSubScan(formatPlugin, columns, selectionRoot);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return UserBitShared.CoreOperatorType.AVRO_ROW_GROUP_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @JsonIgnore
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  /*
+  public static class AvroSubScanSpec {
+
+  }
+  */
+
+  /** XXX - temp hacks **/
+
+  @JsonIgnore
+  public void setEntry(ReadEntryWithPath entry) {
+    this.entry = entry;
+  }
+
+  @JsonIgnore
+  public ReadEntryWithPath getEntry() {
+    return entry;
+  }
+
+  @JsonIgnore
+  public void setFileSystem(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @JsonIgnore
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
new file mode 100644
index 0000000..aeb659f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.Types;
+
+/**
+ * Utility class for working with Avro data types.
+ */
+public final class AvroTypeHelper {
+
+  // XXX - Decide what to do about Avro's NULL type
+  /*
+  public static final MajorType MAJOR_TYPE_NULL_OPTIONAL      = Types.optional(MinorType.NULL);
+  public static final MajorType MAJOR_TYPE_NULL_REQUIRED      = Types.required(MinorType.NULL);
+  public static final MajorType MAJOR_TYPE_NULL_REPEATED      = Types.repeated(MinorType.NULL);
+  */
+  public static final MajorType MAJOR_TYPE_BOOL_OPTIONAL      = Types.optional(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_BOOL_REQUIRED      = Types.required(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_BOOL_REPEATED      = Types.repeated(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_INT_OPTIONAL       = Types.optional(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_INT_REQUIRED       = Types.required(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_INT_REPEATED       = Types.repeated(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_BIGINT_OPTIONAL    = Types.optional(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_BIGINT_REQUIRED    = Types.required(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_BIGINT_REPEATED    = Types.repeated(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_FLOAT4_OPTIONAL    = Types.optional(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT4_REQUIRED    = Types.required(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT4_REPEATED    = Types.repeated(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT8_OPTIONAL    = Types.optional(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_FLOAT8_REQUIRED    = Types.required(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_FLOAT8_REPEATED    = Types.repeated(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_VARBINARY_OPTIONAL = Types.optional(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARBINARY_REQUIRED = Types.required(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARBINARY_REPEATED = Types.repeated(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARCHAR_OPTIONAL   = Types.optional(MinorType.VARCHAR);
+  public static final MajorType MAJOR_TYPE_VARCHAR_REQUIRED   = Types.required(MinorType.VARCHAR);
+  public static final MajorType MAJOR_TYPE_VARCHAR_REPEATED   = Types.repeated(MinorType.VARCHAR);
+
+
+  private static final String UNSUPPORTED = "Unsupported type: %s [%s]";
+
+  private AvroTypeHelper() { }
+
+  /**
+   * Maintains a mapping between Avro types and Drill types. Given an Avro data
+   * type, this method will return the corresponding Drill field major type.
+   *
+   * @param field   Avro field
+   * @return        Major type or null if no corresponding type
+   */
+  public static MajorType getFieldMajorType(final Field field, final DataMode mode) {
+    return getFieldMajorType(field.schema().getType(), mode);
+  }
+
+  /**
+   * Maintains a mapping between Avro types and Drill types. Given an Avro data
+   * type, this method will return the corresponding Drill field major type.
+   *
+   * @param type Avro type
+   * @param mode Data mode
+   * @return     Drill major type or null if no corresponding type
+   */
+  public static MajorType getFieldMajorType(final Type type, final DataMode mode) {
+
+    switch (type) {
+      case MAP:
+      case RECORD:
+      case ENUM:
+      case UNION:
+        throw new UnsupportedOperationException("Complex types are unimplemented");
+      case NULL:
+        /*
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_NULL_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_NULL_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_NULL_REPEATED;
+        }
+        break;
+        */
+        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+      case ARRAY:
+        break;
+      case BOOLEAN:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_BOOL_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_BOOL_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_BOOL_REPEATED;
+        }
+        break;
+      case INT:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_INT_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_INT_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_INT_REPEATED;
+        }
+        break;
+      case LONG:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_BIGINT_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_BIGINT_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_BIGINT_REPEATED;
+        }
+        break;
+      case FLOAT:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_FLOAT4_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_FLOAT4_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_FLOAT4_REPEATED;
+        }
+        break;
+      case DOUBLE:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_FLOAT8_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_FLOAT8_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_FLOAT8_REPEATED;
+        }
+        break;
+      case BYTES:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_VARBINARY_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_VARBINARY_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_VARBINARY_REPEATED;
+        }
+        break;
+      case STRING:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_VARCHAR_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_VARCHAR_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_VARCHAR_REPEATED;
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+    }
+
+    throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
new file mode 100644
index 0000000..d2a1031
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+
+/**
+ * Impersonates a map writer or a list writer depending on construction type.
+ * Perhaps this is a tragic misuse of polymorphism?
+ */
+public class MapOrListWriter {
+
+  final BaseWriter.MapWriter map;
+  final BaseWriter.ListWriter list;
+
+  MapOrListWriter(final BaseWriter.MapWriter writer) {
+    this.map = writer;
+    this.list = null;
+  }
+
+  MapOrListWriter(final BaseWriter.ListWriter writer) {
+    this.map = null;
+    this.list = writer;
+  }
+
+  void start() {
+    if (map != null) {
+      map.start();
+    } else {
+      list.start();
+    }
+  }
+
+  void end() {
+    if (map != null) {
+      map.end();
+    } else {
+      list.end();
+    }
+  }
+
+  MapOrListWriter map(final String name) {
+    assert map != null;
+    return new MapOrListWriter(map.map(name));
+  }
+
+  MapOrListWriter list(final String name) {
+    assert map != null;
+    return new MapOrListWriter(map.list(name));
+  }
+
+  boolean isMapWriter() {
+    return map != null;
+  }
+
+  VarCharWriter varChar(final String name) {
+    return (map != null) ? map.varChar(name) : list.varChar();
+  }
+
+  IntWriter integer(final String name) {
+    return (map != null) ? map.integer(name) : list.integer();
+  }
+
+  BigIntWriter bigInt(final String name) {
+    return (map != null) ? map.bigInt(name) : list.bigInt();
+  }
+
+  Float4Writer float4(final String name) {
+    return (map != null) ? map.float4(name) : list.float4();
+  }
+
+  Float8Writer float8(final String name) {
+    return (map != null) ? map.float8(name) : list.float8();
+  }
+
+  BitWriter bit(final String name) {
+    return (map != null) ? map.bit(name) : list.bit();
+  }
+
+  VarBinaryWriter binary(final String name) {
+    return (map != null) ? map.varBinary(name) : list.varBinary();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 6bf1872..3253e80 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -34,6 +34,9 @@
         },
         "json" : {
           type: "json"
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     },
@@ -57,6 +60,9 @@
         },
         "parquet" : {
           type: "parquet"
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
new file mode 100644
index 0000000..2d2522b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.BaseTestQuery;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for Avro record reader.
+ */
+public class AvroFormatTest extends BaseTestQuery {
+
+  // XXX
+  //      1. Need to test nested field names with same name as top-level names for conflict.
+  //      2. Avro supports linked lists, right? Can we test this?
+  //      3. Avro supports recursive types? Can we test this?
+  //      4. Test queries with not all columns projected.
+
+  @Test
+  public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql =
+            "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " +
+             "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimplePrimitiveSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleArraySchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+    final String sql = "select a_string, c_string_array[0], e_float_array[2] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleArraySchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleNestedSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleNestedSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testDoubleNestedSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'], " +
+            "c_record['nested_1_record']['double_nested_1_string'], " +
+            "c_record['nested_1_record']['double_nested_1_int'] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testDoubleNestedSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleEnumSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+    final String sql = "select a_string, b_enum from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleEnumSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
new file mode 100644
index 0000000..419c054
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for generating Avro test data.
+ */
+public class AvroTestUtil {
+
+  public static final int RECORD_COUNT = 10;
+
+  public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_long").type().longType().noDefault()
+            .name("d_float").type().floatType().noDefault()
+            .name("e_double").type().doubleType().noDefault()
+            .name("f_bytes").type().bytesType().noDefault()
+            .name("g_null").type().nullType().noDefault()
+            .name("h_boolean").type().booleanType().noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleEnumSchema_NoNullValues() throws Exception {
+
+    final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema enumSchema = schema.getField("b_enum").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+
+    try {
+      writer.create(schema, file);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        final GenericData.EnumSymbol symbol =
+                new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
+        record.put("b_enum", symbol);
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleArraySchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-array-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_string_array").type().array().items().stringType().noDefault()
+            .name("d_int_array").type().array().items().intType().noDefault()
+            .name("e_float_array").type().array().items().floatType().noDefault()
+            .endRecord();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        GenericArray array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("c_string_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, "c_string_array_" + i + "_" + j);
+        }
+        record.put("c_string_array", array);
+
+        array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("d_int_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, i * j);
+        }
+        record.put("d_int_array", array);
+
+        array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("e_float_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, (float) (i * j));
+        }
+        record.put("e_float_array", array);
+
+        writer.append(record);
+      }
+
+    } finally {
+      writer.close();
+    }
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleNestedSchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_record").type().record("my_record_1")
+              .namespace("foo.blah.org")
+              .fields()
+              .name("nested_1_string").type().stringType().noDefault()
+              .name("nested_1_int").type().intType().noDefault()
+              .endRecord()
+            .noDefault()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_record").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        record.put("c_record", nestedRecord);
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateDoubleNestedSchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-double-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_record").type().record("my_record_1")
+              .namespace("foo.blah.org")
+              .fields()
+              .name("nested_1_string").type().stringType().noDefault()
+              .name("nested_1_int").type().intType().noDefault()
+              .name("nested_1_record").type().record("my_double_nested_record_1")
+                .namespace("foo.blah.org.rot")
+                .fields()
+                .name("double_nested_1_string").type().stringType().noDefault()
+                .name("double_nested_1_int").type().intType().noDefault()
+                .endRecord()
+                .noDefault()
+              .endRecord()
+              .noDefault()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_record").schema();
+    final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        final GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
+        doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
+        doubleNestedRecord.put("double_nested_1_int", i * i * i);
+
+        nestedRecord.put("nested_1_record", doubleNestedRecord);
+        record.put("c_record", nestedRecord);
+
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index e9772cf..d4d81f6 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -38,6 +38,9 @@
         "txt" : {
           type : "text",
           extensions: [ "txt" ]
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 4b4e558..9a095aa 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -474,6 +474,10 @@ public final class UserBitShared {
      * <code>NESTED_LOOP_JOIN = 35;</code>
      */
     NESTED_LOOP_JOIN(35, 35),
+    /**
+     * <code>AVRO_SUB_SCAN = 36;</code>
+     */
+    AVRO_SUB_SCAN(36, 36),
     ;
 
     /**
@@ -620,6 +624,10 @@ public final class UserBitShared {
      * <code>NESTED_LOOP_JOIN = 35;</code>
      */
     public static final int NESTED_LOOP_JOIN_VALUE = 35;
+    /**
+     * <code>AVRO_SUB_SCAN = 36;</code>
+     */
+    public static final int AVRO_SUB_SCAN_VALUE = 36;
 
 
     public final int getNumber() { return value; }
@@ -662,6 +670,7 @@ public final class UserBitShared {
         case 33: return HBASE_SUB_SCAN;
         case 34: return WINDOW;
         case 35: return NESTED_LOOP_JOIN;
+        case 36: return AVRO_SUB_SCAN;
         default: return null;
       }
     }
@@ -19856,7 +19865,7 @@ public final class UserBitShared {
       "yType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020" +
       "\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
       "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
-      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\312\005\n\020CoreO" +
+      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\335\005\n\020CoreO" +
       "peratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADC" +
       "AST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGA" +
       "TE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025H",
@@ -19874,8 +19883,9 @@ public final class UserBitShared {
       "AN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_S" +
       "UB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUC" +
       "ER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIN" +
-      "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#B.\n\033org.apach" +
-      "e.drill.exec.protoB\rUserBitSharedH\001"
+      "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_" +
+      "SCAN\020$B.\n\033org.apache.drill.exec.protoB\rU" +
+      "serBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index a37209d..b21d3ae 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -57,7 +57,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     PRODUCER_CONSUMER(32),
     HBASE_SUB_SCAN(33),
     WINDOW(34),
-    NESTED_LOOP_JOIN(35);
+    NESTED_LOOP_JOIN(35),
+    AVRO_SUB_SCAN(36);
     
     public final int number;
     
@@ -111,6 +112,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 33: return HBASE_SUB_SCAN;
             case 34: return WINDOW;
             case 35: return NESTED_LOOP_JOIN;
+            case 36: return AVRO_SUB_SCAN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 0f86958..10c2790 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -243,4 +243,5 @@ enum CoreOperatorType {
   HBASE_SUB_SCAN = 33;
   WINDOW = 34;
   NESTED_LOOP_JOIN = 35;
+  AVRO_SUB_SCAN = 36;
 }


[3/4] drill git commit: DRILL-1512: Refactor AvroFormatPlugin

Posted by sm...@apache.org.
DRILL-1512: Refactor AvroFormatPlugin

Extend EasyFormatPlugin and remove AvroGroupScan and AvroSubScan


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bf3db318
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bf3db318
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bf3db318

Branch: refs/heads/master
Commit: bf3db3186ce633ac4a494bad888ae54542175ca4
Parents: 55a9a59
Author: Steven Phillips <sm...@apache.org>
Authored: Wed Apr 15 03:26:04 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 15 12:39:46 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/avro/AvroFormatPlugin.java |  92 ++------
 .../drill/exec/store/avro/AvroGroupScan.java    | 208 -------------------
 .../drill/exec/store/avro/AvroRecordReader.java |   4 +-
 .../exec/store/avro/AvroScanBatchCreator.java   |  52 -----
 .../drill/exec/store/avro/AvroSubScan.java      | 142 -------------
 5 files changed, 24 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 4fe1f71..2f487d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -18,111 +18,61 @@
 package org.apache.drill.exec.store.avro;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.ImmutableSet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.AbstractWriter;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Format plugin for Avro data files.
  */
-public class AvroFormatPlugin implements FormatPlugin {
-
-  private final String name;
-  private final DrillbitContext context;
-  private final DrillFileSystem fs;
-  private final StoragePluginConfig storagePluginConfig;
-  private final AvroFormatConfig formatConfig;
-  private final BasicFormatMatcher matcher;
+public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
   public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
                           StoragePluginConfig storagePluginConfig) {
     this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
   }
 
-  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
-                          StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) {
-    this.name = name;
-    this.context = context;
-    this.fs = fs;
-    this.storagePluginConfig = storagePluginConfig;
-    this.formatConfig = formatConfig;
-
-    // XXX - What does 'compressible' mean in this context?
-    this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
+  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+    super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
   }
 
   @Override
-  public boolean supportsRead() {
+  public boolean supportsPushDown() {
     return true;
   }
 
   @Override
-  public boolean supportsWrite() {
-    return false;
-  }
-
-  @Override
-  public FormatMatcher getMatcher() {
-    return matcher;
-  }
-
-  @Override
-  public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
-    throw new UnsupportedOperationException("Unimplemented");
-  }
-
-  @Override
-  public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
-    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
-  }
-
-  @Override
-  public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
-    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
+    return new AvroRecordReader(context, fileWork.getPath(), dfs, columns);
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
-    return ImmutableSet.of();
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+    throw new UnsupportedOperationException("unimplemented");
   }
 
   @Override
-  public AvroFormatConfig getConfig() {
-    return formatConfig;
+  public int getReaderOperatorType() {
+    return CoreOperatorType.AVRO_SUB_SCAN_VALUE;
   }
 
   @Override
-  public StoragePluginConfig getStorageConfig() {
-    return storagePluginConfig;
+  public int getWriterOperatorType() {
+    throw new UnsupportedOperationException("unimplemented");
   }
 
-  @Override
-  public DrillFileSystem getFileSystem() {
-    return fs;
-  }
 
-  @Override
-  public DrillbitContext getContext() {
-    return context;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
deleted file mode 100644
index fcc1f94..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
-//import org.apache.drill.exec.store.avro.AvroSubScan.AvroSubScanSpec;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Group scan implementation for Avro data files.
- */
-@JsonTypeName("avro-scan")
-public class AvroGroupScan extends AbstractGroupScan {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroGroupScan.class);
-
-  private final AvroFormatPlugin formatPlugin;
-  private final AvroFormatConfig formatConfig;
-  private final List<SchemaPath> columns;
-  private final FileSystem fs;
-  private final List<ReadEntryWithPath> entries;
-  private final String selectionRoot;
-
-  //private Map<Integer, List<AvroSubScanSpec>> endpointMappings;
-
-  private List<EndpointAffinity> endpointAffinities;
-
-  @JsonCreator
-  public AvroGroupScan(@JsonProperty("entries") final List<ReadEntryWithPath> entries,
-                       @JsonProperty("storage") final StoragePluginConfig storageConfig,
-                       @JsonProperty("format") final FormatPluginConfig formatConfig,
-                       @JacksonInject final StoragePluginRegistry engineRegistry,
-                       @JsonProperty("columns") final List<SchemaPath> columns,
-                       @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
-
-    this.columns = columns;
-    final AvroFormatConfig afc;
-    if (formatConfig == null) {
-      afc = new AvroFormatConfig();
-    } else {
-      afc = (AvroFormatConfig) formatConfig;
-    }
-    Preconditions.checkNotNull(storageConfig);
-    Preconditions.checkNotNull(afc);
-    this.formatPlugin = (AvroFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, afc);
-    Preconditions.checkNotNull(this.formatPlugin);
-    this.fs = formatPlugin.getFileSystem().getUnderlying();
-    this.formatConfig = formatPlugin.getConfig();
-    this.entries = entries;
-    this.selectionRoot = selectionRoot;
-  }
-
-  public AvroGroupScan(final List<FileStatus> files, final AvroFormatPlugin formatPlugin,
-                       final String selectionRoot, final List<SchemaPath> columns) throws IOException {
-
-    this.formatPlugin = formatPlugin;
-    this.columns = columns;
-    this.formatConfig = formatPlugin.getConfig();
-    this.fs = formatPlugin.getFileSystem().getUnderlying();
-    this.selectionRoot = selectionRoot;
-
-    this.entries = Lists.newArrayList();
-    for (final FileStatus fs : files) {
-      entries.add(new ReadEntryWithPath(fs.getPath().toString()));
-    }
-  }
-
-  @JsonProperty("format")
-  public AvroFormatConfig getFormatConfig() {
-    return this.formatConfig;
-  }
-
-  @JsonProperty("storage")
-  public StoragePluginConfig getEngineConfig() {
-    return this.formatPlugin.getStorageConfig();
-  }
-
-  private AvroGroupScan(final AvroGroupScan that, final List<SchemaPath> columns) {
-    this.columns = (columns == null) ? that.columns : columns;
-    this.entries = that.entries;
-    this.formatConfig = that.formatConfig;
-    this.formatPlugin = that.formatPlugin;
-    this.fs = that.fs;
-    this.selectionRoot = that.selectionRoot;
-
-    // XXX - DON'T FORGET TO ADD THESE AFTER WE'VE IMPLEMENTED AFFINITY
-    //this.endpointAffinities = that.endpointAffinities;
-    //this.mappings = that.mappings;
-    //this.rowCount = that.rowCount;
-    //this.rowGroupInfos = that.rowGroupInfos;
-    //this.columnValueCounts = that.columnValueCounts;
-  }
-
-  @Override
-  public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
-    // XXX - Unimplemented
-    logger.warn("AvroGroupScan.applyAssignments() is not implemented");
-  }
-
-  @Override
-  public AvroSubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
-
-    final AvroSubScan sub = new AvroSubScan(formatPlugin, columns, selectionRoot);
-
-    // XXX - This is a temporary hack just to get something working. Need to revisit sub-scan specs
-    //       once we work out affinity and endpoints.
-    sub.setEntry(entries.get(0));
-    sub.setFileSystem(fs);
-
-    return sub;
-  }
-
-  @Override
-  public int getMaxParallelizationWidth() {
-    // XXX - Finish
-    return 1;
-  }
-
-  @Override
-  public ScanStats getScanStats() {
-    // XXX - Is 0 the correct value for second arg? What if I don't know the row count a priori?
-    return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 0, 1, 1);
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    // XXX - Unimplemented
-    if (endpointAffinities != null) {
-      return endpointAffinities;
-    }
-    return Collections.emptyList();
-  }
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    return new AvroGroupScan(this, null);
-  }
-
-  @Override
-  public String getDigest() {
-    return toString();
-  }
-
-  @Override
-  public String toString() {
-    return "AvroGroupScan [entries=" + entries +
-            ", selectionRoot=" + selectionRoot +
-            ", columns=" + columns + "]";
-  }
-
-  @Override
-  public GroupScan clone(final List<SchemaPath> columns) {
-    return new AvroGroupScan(this, columns);
-  }
-
-  @JsonIgnore
-  public boolean canPushdownProjects(final List<SchemaPath> columns) {
-    return true;
-  }
-
-  public List<ReadEntryWithPath> getEntries() {
-    return entries;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 3b7697d..489a989 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -73,6 +73,7 @@ public class AvroRecordReader extends AbstractRecordReader {
 
   private DataFileReader<GenericContainer> reader = null;
   private OperatorContext operatorContext;
+  private FileSystem fs;
 
   private static final int DEFAULT_BATCH_SIZE = 1000;
 
@@ -91,6 +92,7 @@ public class AvroRecordReader extends AbstractRecordReader {
 
     hadoop = new Path(inputPath);
     buffer = fragmentContext.getManagedBuffer();
+    this.fs = fileSystem;
 
     setColumns(projectedColumns);
   }
@@ -101,7 +103,7 @@ public class AvroRecordReader extends AbstractRecordReader {
     writer = new VectorContainerWriter(output);
 
     try {
-      reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>());
+      reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
     } catch (IOException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
deleted file mode 100644
index 42c8e99..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import java.util.List;
-
-/**
- * Batch creator for Avro scans.
- */
-public class AvroScanBatchCreator implements BatchCreator<AvroSubScan> {
-
-
-  @Override
-  public RecordBatch getBatch(final FragmentContext context, final AvroSubScan subScan,
-                              final List<RecordBatch> children) throws ExecutionSetupException {
-
-    Preconditions.checkArgument(children.isEmpty());
-    List<SchemaPath> columns = subScan.getColumns();
-    List<RecordReader> readers = Lists.newArrayList();
-
-    readers.add(new AvroRecordReader(context, subScan.getEntry().getPath(), subScan.getFileSystem(), columns));
-
-    return new ScanBatch(subScan, context, readers.iterator());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/bf3db318/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
deleted file mode 100644
index 0a579aa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-
-import com.google.common.collect.Iterators;
-import com.google.common.base.Preconditions;
-import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Contains information for reading a single Avro row group from HDFS.
- */
-@JsonTypeName("avro-sub-scan")
-public class AvroSubScan extends AbstractBase implements SubScan {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroSubScan.class);
-
-  private final AvroFormatPlugin formatPlugin;
-  private final AvroFormatConfig formatConfig;
-  private final List<SchemaPath> columns;
-  private final String selectionRoot;
-
-  private ReadEntryWithPath entry;
-  private FileSystem fs;
-
-  @JsonCreator
-  public AvroSubScan(@JacksonInject final StoragePluginRegistry registry,
-                     @JsonProperty("storage") final StoragePluginConfig storageConfig,
-                     @JsonProperty("format") final FormatPluginConfig formatConfig,
-                     @JsonProperty("columns") final List<SchemaPath> columns,
-                     @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
-    this((AvroFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
-            formatConfig == null ? new AvroFormatConfig() : formatConfig), columns, selectionRoot);
-  }
-
-  public AvroSubScan(final AvroFormatPlugin formatPlugin, final List<SchemaPath> columns,
-                     final String selectionRoot) {
-    this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
-    this.formatConfig = formatPlugin.getConfig();
-    this.columns = columns;
-    this.selectionRoot = selectionRoot;
-  }
-
-  @JsonProperty("storage")
-  public StoragePluginConfig getEngineConfig() {
-    return formatPlugin.getStorageConfig();
-  }
-
-  @JsonProperty("format")
-  public AvroFormatConfig getFormatConfig() {
-    return formatConfig;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(final PhysicalVisitor<T, X, E> physicalVisitor, final X value) throws E {
-    return physicalVisitor.visitSubScan(this, value);
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    return new AvroSubScan(formatPlugin, columns, selectionRoot);
-  }
-
-  @Override
-  public int getOperatorType() {
-    return UserBitShared.CoreOperatorType.AVRO_ROW_GROUP_SCAN_VALUE;
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  @JsonIgnore
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  /*
-  public static class AvroSubScanSpec {
-
-  }
-  */
-
-  /** XXX - temp hacks **/
-
-  @JsonIgnore
-  public void setEntry(ReadEntryWithPath entry) {
-    this.entry = entry;
-  }
-
-  @JsonIgnore
-  public ReadEntryWithPath getEntry() {
-    return entry;
-  }
-
-  @JsonIgnore
-  public void setFileSystem(FileSystem fs) {
-    this.fs = fs;
-  }
-
-  @JsonIgnore
-  public FileSystem getFileSystem() {
-    return fs;
-  }
-}