You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/04 21:26:56 UTC

incubator-parquet-mr git commit: PARQUET-192: Fix map null encoding

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master f1b54876a -> c82f70376


PARQUET-192: Fix map null encoding

This depends on PARQUET-191 for the correct schema representation.

Author: Ryan Blue <bl...@apache.org>

Closes #127 from rdblue/PARQUET-192-fix-map-null-encoding and squashes the following commits:

fffde82 [Ryan Blue] PARQUET-192: Fix parquet-avro maps with null values.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/c82f7037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/c82f7037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/c82f7037

Branch: refs/heads/master
Commit: c82f703768eb8a122546de23e412a037aa1770b2
Parents: f1b5487
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Mar 4 12:26:52 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Mar 4 12:26:52 2015 -0800

----------------------------------------------------------------------
 .../java/parquet/avro/AvroWriteSupport.java     | 34 ++++++------
 .../test/java/parquet/avro/TestReadWrite.java   | 57 ++++++++++++++++++++
 .../src/test/resources/map_with_nulls.avsc      | 11 ++++
 3 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index 9e6c8e9..529ca23 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -23,12 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import parquet.hadoop.api.WriteSupport;
@@ -46,6 +43,7 @@ import parquet.schema.Type;
 public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
 
   private static final String AVRO_SCHEMA = "parquet.avro.schema";
+  private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
 
   private RecordConsumer recordConsumer;
   private MessageType rootSchema;
@@ -132,28 +130,32 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     recordConsumer.endGroup();
   }
 
-  private <V> void writeMap(GroupType schema, Schema avroSchema, 
+  private <V> void writeMap(GroupType schema, Schema avroSchema,
                             Map<CharSequence, V> map) {
     GroupType innerGroup = schema.getType(0).asGroupType();
     Type keyType = innerGroup.getType(0);
     Type valueType = innerGroup.getType(1);
-    Schema keySchema = Schema.create(Schema.Type.STRING);
 
     recordConsumer.startGroup(); // group wrapper (original type MAP)
     if (map.size() > 0) {
       recordConsumer.startField("map", 0);
-      recordConsumer.startGroup(); // "repeated" group wrapper
-      recordConsumer.startField("key", 0);
-      for (CharSequence key : map.keySet()) {
-        writeValue(keyType, keySchema, key);
-      }
-      recordConsumer.endField("key", 0);
-      recordConsumer.startField("value", 1);
-      for (V value : map.values()) {
-        writeValue(valueType, avroSchema.getValueType(), value);
+
+      for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
+        recordConsumer.startGroup(); // "repeated" group wrapper
+        recordConsumer.startField("key", 0);
+        writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
+        recordConsumer.endField("key", 0);
+        V value = entry.getValue();
+        if (value != null) {
+          recordConsumer.startField("value", 1);
+          writeValue(valueType, avroSchema.getValueType(), value);
+          recordConsumer.endField("value", 1);
+        } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+          throw new RuntimeException("Null map value for " + avroSchema.getName());
+        }
+        recordConsumer.endGroup();
       }
-      recordConsumer.endField("value", 1);
-      recordConsumer.endGroup();
+
       recordConsumer.endField("map", 0);
     }
     recordConsumer.endGroup();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
index 4cff940..7467378 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
@@ -20,6 +20,7 @@ package parquet.avro;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.io.File;
 import java.nio.ByteBuffer;
@@ -103,6 +104,62 @@ public class TestReadWrite {
   }
 
   @Test
+  public void testMapWithNulls() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map_with_nulls.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(map, nextRecord.get("mymap"));
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testMapRequiredValueWithNull() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+  }
+
+  @Test
   public void testMapWithUtf8Key() throws Exception {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("map.avsc").openStream());

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/test/resources/map_with_nulls.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/map_with_nulls.avsc b/parquet-avro/src/test/resources/map_with_nulls.avsc
new file mode 100644
index 0000000..f0b2831
--- /dev/null
+++ b/parquet-avro/src/test/resources/map_with_nulls.avsc
@@ -0,0 +1,11 @@
+{
+  "type": "record",
+  "name": "myrecord",
+  "fields": [ {
+    "name": "mymap",
+    "type": {
+      "type": "map",
+      "values": ["null", "int"]
+    }
+  } ]
+}