You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/04 21:26:56 UTC
incubator-parquet-mr git commit: PARQUET-192: Fix map null encoding
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master f1b54876a -> c82f70376
PARQUET-192: Fix map null encoding
This depends on PARQUET-191 for the correct schema representation.
Author: Ryan Blue <bl...@apache.org>
Closes #127 from rdblue/PARQUET-192-fix-map-null-encoding and squashes the following commits:
fffde82 [Ryan Blue] PARQUET-192: Fix parquet-avro maps with null values.
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/c82f7037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/c82f7037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/c82f7037
Branch: refs/heads/master
Commit: c82f703768eb8a122546de23e412a037aa1770b2
Parents: f1b5487
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Mar 4 12:26:52 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Mar 4 12:26:52 2015 -0800
----------------------------------------------------------------------
.../java/parquet/avro/AvroWriteSupport.java | 34 ++++++------
.../test/java/parquet/avro/TestReadWrite.java | 57 ++++++++++++++++++++
.../src/test/resources/map_with_nulls.avsc | 11 ++++
3 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index 9e6c8e9..529ca23 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -23,12 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.api.WriteSupport;
@@ -46,6 +43,7 @@ import parquet.schema.Type;
public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
private static final String AVRO_SCHEMA = "parquet.avro.schema";
+ private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
private RecordConsumer recordConsumer;
private MessageType rootSchema;
@@ -132,28 +130,32 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
recordConsumer.endGroup();
}
- private <V> void writeMap(GroupType schema, Schema avroSchema,
+ private <V> void writeMap(GroupType schema, Schema avroSchema,
Map<CharSequence, V> map) {
GroupType innerGroup = schema.getType(0).asGroupType();
Type keyType = innerGroup.getType(0);
Type valueType = innerGroup.getType(1);
- Schema keySchema = Schema.create(Schema.Type.STRING);
recordConsumer.startGroup(); // group wrapper (original type MAP)
if (map.size() > 0) {
recordConsumer.startField("map", 0);
- recordConsumer.startGroup(); // "repeated" group wrapper
- recordConsumer.startField("key", 0);
- for (CharSequence key : map.keySet()) {
- writeValue(keyType, keySchema, key);
- }
- recordConsumer.endField("key", 0);
- recordConsumer.startField("value", 1);
- for (V value : map.values()) {
- writeValue(valueType, avroSchema.getValueType(), value);
+
+ for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
+ recordConsumer.startGroup(); // "repeated" group wrapper
+ recordConsumer.startField("key", 0);
+ writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
+ recordConsumer.endField("key", 0);
+ V value = entry.getValue();
+ if (value != null) {
+ recordConsumer.startField("value", 1);
+ writeValue(valueType, avroSchema.getValueType(), value);
+ recordConsumer.endField("value", 1);
+ } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+ throw new RuntimeException("Null map value for " + avroSchema.getName());
+ }
+ recordConsumer.endGroup();
}
- recordConsumer.endField("value", 1);
- recordConsumer.endGroup();
+
recordConsumer.endField("map", 0);
}
recordConsumer.endGroup();
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
index 4cff940..7467378 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
@@ -20,6 +20,7 @@ package parquet.avro;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
import java.nio.ByteBuffer;
@@ -103,6 +104,62 @@ public class TestReadWrite {
}
@Test
+ public void testMapWithNulls() throws Exception {
+ Schema schema = new Schema.Parser().parse(
+ Resources.getResource("map_with_nulls.avsc").openStream());
+
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ Path file = new Path(tmp.getPath());
+
+ AvroParquetWriter<GenericRecord> writer =
+ new AvroParquetWriter<GenericRecord>(file, schema);
+
+ // Write a record with a null value
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ map.put("thirty-four", 34);
+ map.put("eleventy-one", null);
+ map.put("one-hundred", 100);
+
+ GenericData.Record record = new GenericRecordBuilder(schema)
+ .set("mymap", map).build();
+ writer.write(record);
+ writer.close();
+
+ AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+ GenericRecord nextRecord = reader.read();
+
+ assertNotNull(nextRecord);
+ assertEquals(map, nextRecord.get("mymap"));
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testMapRequiredValueWithNull() throws Exception {
+ Schema schema = Schema.createRecord("record1", null, null, false);
+ schema.setFields(Lists.newArrayList(
+ new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
+
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ Path file = new Path(tmp.getPath());
+
+ AvroParquetWriter<GenericRecord> writer =
+ new AvroParquetWriter<GenericRecord>(file, schema);
+
+ // Write a record with a null value
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ map.put("thirty-four", 34);
+ map.put("eleventy-one", null);
+ map.put("one-hundred", 100);
+
+ GenericData.Record record = new GenericRecordBuilder(schema)
+ .set("mymap", map).build();
+ writer.write(record);
+ }
+
+ @Test
public void testMapWithUtf8Key() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("map.avsc").openStream());
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/c82f7037/parquet-avro/src/test/resources/map_with_nulls.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/map_with_nulls.avsc b/parquet-avro/src/test/resources/map_with_nulls.avsc
new file mode 100644
index 0000000..f0b2831
--- /dev/null
+++ b/parquet-avro/src/test/resources/map_with_nulls.avsc
@@ -0,0 +1,11 @@
+{
+ "type": "record",
+ "name": "myrecord",
+ "fields": [ {
+ "name": "mymap",
+ "type": {
+ "type": "map",
+ "values": ["null", "int"]
+ }
+ } ]
+}