You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/09/14 07:28:57 UTC

[10/15] drill git commit: DRILL-3458: Enhancing Avro file format by support nested data types like union, map

DRILL-3458: Enhancing Avro file format by support nested data types like union, map

Removing unused imports and fixing typos in the comments

Adding sanity check to verify whether union is an optional field or not and negative testcases

Handling string and utf-8 datatypes properly

removing bracket notation and documentation


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fe07b6c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fe07b6c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fe07b6c4

Branch: refs/heads/master
Commit: fe07b6c47682b898b60a9d8221e10207240061ff
Parents: 8f4ca6e
Author: Kamesh <ka...@gmail.com>
Authored: Sat Jul 4 23:35:48 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Sep 13 21:58:33 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/avro/AvroRecordReader.java |  65 +++-
 .../drill/exec/store/avro/MapOrListWriter.java  |   9 +
 .../drill/exec/store/avro/AvroFormatTest.java   |  77 ++++-
 .../drill/exec/store/avro/AvroTestUtil.java     | 306 +++++++++++++++++++
 4 files changed, 438 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fe07b6c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 271c8e9..a09cd53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -22,10 +22,13 @@ import io.netty.buffer.DrillBuf;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericContainer;
@@ -53,6 +56,7 @@ import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
 
 /**
@@ -164,17 +168,22 @@ public class AvroRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void process(final Object value, final Schema schema, final String fieldName, final MapOrListWriter writer) {
-
-    writer.start();
+  private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriter writer) {
+    if (value == null) {
+      return;
+    }
     final Schema.Type type = schema.getType();
 
     switch (type) {
       case RECORD:
-        for (final Schema.Field field : schema.getFields()) {
+        // list field of MapOrListWriter will be non null when we want to store array of maps/records.
+        MapOrListWriter _writer = writer;
 
-          MapOrListWriter _writer = writer;
-          if (field.schema().getType() == Schema.Type.RECORD) {
+        for (final Schema.Field field : schema.getFields()) {
+          if (field.schema().getType() == Schema.Type.RECORD ||
+              (field.schema().getType() == Schema.Type.UNION &&
+              field.schema().getTypes().get(0).getType() == Schema.Type.NULL &&
+              field.schema().getTypes().get(1).getType() == Schema.Type.RECORD)) {
             _writer = writer.map(field.name());
           }
 
@@ -184,13 +193,38 @@ public class AvroRecordReader extends AbstractRecordReader {
       case ARRAY:
         assert fieldName != null;
         final GenericArray array = (GenericArray) value;
+        Schema elementSchema = array.getSchema().getElementType();
+        Type elementType = elementSchema.getType();
+        if (elementType == Schema.Type.RECORD || elementType == Schema.Type.MAP){
+          writer = writer.list(fieldName).listoftmap(fieldName);
+        } else {
+          writer = writer.list(fieldName);
+        }
+        writer.start();
         for (final Object o : array) {
-          process(o, array.getSchema().getElementType(), fieldName, writer.list(fieldName));
+          process(o, elementSchema, fieldName, writer);
         }
+        writer.end();
         break;
-      case FIXED:
       case UNION:
+        // currently supporting only nullable union (optional fields) like ["null", "some-type"].
+        if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
+          throw new UnsupportedOperationException("Avro union type must be of the format : [\"null\", \"some-type\"]");
+        }
+        process(value, schema.getTypes().get(1), fieldName, writer);
+        break;
       case MAP:
+        @SuppressWarnings("unchecked")
+        final HashMap<Object, Object> map = (HashMap<Object, Object>) value;
+        Schema valueSchema = schema.getValueType();
+        writer = writer.map(fieldName);
+        writer.start();
+        for (Entry<Object, Object> entry : map.entrySet()) {
+          process(entry.getValue(), valueSchema, entry.getKey().toString(), writer);
+        }
+        writer.end();
+        break;
+      case FIXED:
         throw new UnsupportedOperationException("Unimplemented type: " + type.toString());
       case ENUM:  // Enum symbols are strings
       case NULL:  // Treat null type as a primitive
@@ -214,19 +248,26 @@ public class AvroRecordReader extends AbstractRecordReader {
         break;
     }
 
-    writer.end();
   }
 
   private void processPrimitive(final Object value, final Schema.Type type, final String fieldName,
                                 final MapOrListWriter writer) {
+    if (value == null) {
+      return;
+    }
 
     switch (type) {
       case STRING:
-        final Utf8 utf8 = (Utf8) value;
-        final int length = utf8.length();
+        byte[] binary = null;
+        if (value instanceof Utf8) {
+          binary = ((Utf8) value).getBytes();
+        } else {
+          binary = value.toString().getBytes(Charsets.UTF_8);
+        }
+        final int length = binary.length;
         final VarCharHolder vh = new VarCharHolder();
         ensure(length);
-        buffer.setBytes(0, utf8.getBytes());
+        buffer.setBytes(0, binary);
         vh.buffer = buffer;
         vh.start = 0;
         vh.end = length;

http://git-wip-us.apache.org/repos/asf/drill/blob/fe07b6c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
index d2a1031..1a94452 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
@@ -66,6 +66,11 @@ public class MapOrListWriter {
     return new MapOrListWriter(map.map(name));
   }
 
+  MapOrListWriter listoftmap(final String name) {
+    assert list != null;
+    return new MapOrListWriter(list.map());
+  }
+
   MapOrListWriter list(final String name) {
     assert map != null;
     return new MapOrListWriter(map.list(name));
@@ -75,6 +80,10 @@ public class MapOrListWriter {
     return map != null;
   }
 
+  boolean isListWriter() {
+    return list != null;
+  }
+
   VarCharWriter varChar(final String name) {
     return (map != null) ? map.varChar(name) : list.varChar();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fe07b6c4/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index 2d2522b..bcbc707 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -18,7 +18,8 @@
 package org.apache.drill.exec.store.avro;
 
 import org.apache.drill.BaseTestQuery;
-
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -79,8 +80,8 @@ public class AvroFormatTest extends BaseTestQuery {
   public void testSimpleNestedSchema_NoNullValues() throws Exception {
 
     final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
-    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'] " +
-            "from dfs_test.`" + file + "`";
+    final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int " +
+            "from dfs_test.`" + file + "` t";
     test(sql);
   }
 
@@ -96,10 +97,10 @@ public class AvroFormatTest extends BaseTestQuery {
   public void testDoubleNestedSchema_NoNullValues() throws Exception {
 
     final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
-    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'], " +
-            "c_record['nested_1_record']['double_nested_1_string'], " +
-            "c_record['nested_1_record']['double_nested_1_int'] " +
-            "from dfs_test.`" + file + "`";
+    final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int, " +
+            "t.c_record.nested_1_record.double_nested_1_string, " +
+            "t.c_record.nested_1_record.double_nested_1_int " +
+            "from dfs_test.`" + file + "` t";
     test(sql);
   }
 
@@ -126,4 +127,66 @@ public class AvroFormatTest extends BaseTestQuery {
     final String sql = "select * from dfs_test.`" + file + "`";
     test(sql);
   }
+
+  @Test
+  public void testSimpleUnionSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateUnionSchema_WithNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testShouldFailSimpleUnionNonNullSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateUnionSchema_WithNonNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    try {
+      test(sql);
+      Assert.fail("Test should fail as union is only supported for optional fields");
+    } catch(UserRemoteException e) {
+      String message = e.getMessage();
+      Assert.assertTrue(message.contains("Avro union type must be of the format : [\"null\", \"some-type\"]"));
+    }
+  }
+
+  @Test
+  public void testNestedUnionSchema_withNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateUnionNestedSchema_withNullValues();
+    final String sql = "select t.c_record.nested_1_string,t.c_record.nested_1_int from dfs_test.`" + file + "` t";
+    test(sql);
+  }
+
+  @Test
+  public void testNestedUnionArraySchema_withNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateUnionNestedArraySchema_withNullValues();
+    final String sql = "select t.c_array[0].nested_1_string,t.c_array[0].nested_1_int from dfs_test.`" + file + "` t";
+    test(sql);
+  }
+
+  @Test
+  public void testMapSchema_withNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateMapSchema_withNullValues();
+    final String sql = "select c_map['key1'],c_map['key2'] from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testMapSchemaComplex_withNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateMapSchemaComplex_withNullValues();
+    final String sql = "select d_map['key1'],d_map['key2'] from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testStringAndUtf8Data() throws Exception {
+
+    final String file = AvroTestUtil.generateStringAndUtf8Data();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fe07b6c4/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
index 419c054..d847b1a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericArray;
@@ -27,6 +28,8 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Utilities for generating Avro test data.
@@ -79,6 +82,98 @@ public class AvroTestUtil {
     return file.getAbsolutePath();
   }
 
+  public static String generateUnionSchema_WithNullValues() throws Exception {
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_long").type().longType().noDefault()
+            .name("d_float").type().floatType().noDefault()
+            .name("e_double").type().doubleType().noDefault()
+            .name("f_bytes").type().bytesType().noDefault()
+            .name("g_null").type().nullType().noDefault()
+            .name("h_boolean").type().booleanType().noDefault()
+            .name("i_union").type().optional().doubleType()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        record.put("i_union", (i % 2 == 0 ? (double) i : null));
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateUnionSchema_WithNonNullValues() throws Exception {
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_long").type().longType().noDefault()
+            .name("d_float").type().floatType().noDefault()
+            .name("e_double").type().doubleType().noDefault()
+            .name("f_bytes").type().bytesType().noDefault()
+            .name("g_null").type().nullType().noDefault()
+            .name("h_boolean").type().booleanType().noDefault()
+            .name("i_union").type().unionOf().doubleType().and().longType().endUnion().noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        record.put("i_union", (i % 2 == 0 ? (double) i : (long) i));
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
   public static String generateSimpleEnumSchema_NoNullValues() throws Exception {
 
     final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
@@ -210,6 +305,185 @@ public class AvroTestUtil {
     return file.getAbsolutePath();
   }
 
+  public static String generateUnionNestedArraySchema_withNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_array").type().optional().array().items().record("my_record_1")
+            .namespace("foo.blah.org").fields()
+            .name("nested_1_string").type().optional().stringType()
+            .name("nested_1_int").type().optional().intType()
+            .endRecord()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_array").schema();
+    final Schema arraySchema = nestedSchema.getTypes().get(1);
+    final Schema itemSchema = arraySchema.getElementType();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          GenericArray array = new GenericData.Array<String>(1, arraySchema);
+          final GenericRecord nestedRecord = new GenericData.Record(itemSchema);
+          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+          nestedRecord.put("nested_1_int", i * i);
+          array.add(nestedRecord);
+          record.put("c_array", array);
+        }
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateMapSchema_withNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+            .endRecord();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          Map<String, String> strMap = new HashMap<>();
+          strMap.put("key1", "nested_1_string_" +  i);
+          strMap.put("key2", "nested_1_string_" +  (i + 1 ));
+          record.put("c_map", strMap);
+        }
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateMapSchemaComplex_withNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+            .name("d_map").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
+            .endRecord();
+
+    final Schema arrayMapSchema = schema.getField("d_map").schema();
+    final Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          Map<String, String> c_map = new HashMap<>();
+          c_map.put("key1", "nested_1_string_" +  i);
+          c_map.put("key2", "nested_1_string_" +  (i + 1 ));
+          record.put("c_map", c_map);
+        } else {
+          Map<String, GenericArray> d_map = new HashMap<>();
+          GenericArray array = new GenericData.Array<String>(RECORD_COUNT, arrayItemSchema);
+          for (int j = 0; j < RECORD_COUNT; j++) {
+            array.add((double)j);
+          }
+          d_map.put("key1", array);
+          d_map.put("key2", array);
+
+          record.put("d_map", d_map);
+        }
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateUnionNestedSchema_withNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_record").type().optional().record("my_record_1")
+            .namespace("foo.blah.org").fields()
+            .name("nested_1_string").type().optional().stringType()
+            .name("nested_1_int").type().optional().intType()
+            .endRecord()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_record").schema();
+    final Schema optionalSchema = nestedSchema.getTypes().get(1);
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        if (i % 2 == 0) {
+          final GenericRecord nestedRecord = new GenericData.Record(optionalSchema);
+          nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+          nestedRecord.put("nested_1_int", i * i);
+          record.put("c_record", nestedRecord);
+        }
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
   public static String generateDoubleNestedSchema_NoNullValues() throws Exception {
 
     final File file = File.createTempFile("avro-double-nested-test", ".avro");
@@ -267,4 +541,36 @@ public class AvroTestUtil {
 
     return file.getAbsolutePath();
   }
+
+  public static String generateStringAndUtf8Data() throws Exception {
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringBuilder().prop("avro.java.string", "String").endString().noDefault()
+            .name("b_utf8").type().stringType().noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_utf8", "b_" + i);
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
 }