You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/11/30 03:29:30 UTC

hive git commit: HIVE-17972: Implement Parquet vectorization reader for Map type (Colin Ma, reviewed by Ferdinand Xu)

Repository: hive
Updated Branches:
  refs/heads/master b8aa16ff6 -> 2efb7d38a


HIVE-17972: Implement Parquet vectorization reader for Map type (Colin Ma, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2efb7d38
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2efb7d38
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2efb7d38

Branch: refs/heads/master
Commit: 2efb7d38af639ee77f817f2ebf772314309174be
Parents: b8aa16f
Author: Ferdinand Xu <ch...@intel.com>
Authored: Thu Nov 30 11:24:31 2017 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Thu Nov 30 11:24:31 2017 +0800

----------------------------------------------------------------------
 .../vector/VectorizedMapColumnReader.java       |  69 ++++
 .../vector/VectorizedParquetRecordReader.java   |  12 +
 .../parquet/TestVectorizedMapColumnReader.java  | 327 +++++++++++++++++++
 .../parquet/VectorizedColumnReaderTestBase.java |  28 ++
 4 files changed, 436 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java
new file mode 100644
index 0000000..6099a28
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedMapColumnReader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records for a map column.
+ */
+public class VectorizedMapColumnReader implements VectorizedColumnReader {
+  private VectorizedListColumnReader keyColumnReader;
+  private VectorizedListColumnReader valueColumnReader;
+
+  public VectorizedMapColumnReader(VectorizedListColumnReader keyColumnReader,
+      VectorizedListColumnReader valueColumnReader) {
+    this.keyColumnReader = keyColumnReader;
+    this.valueColumnReader = valueColumnReader;
+  }
+
+  @Override
+  public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException {
+    MapColumnVector mapColumnVector = (MapColumnVector) column;
+    MapTypeInfo mapTypeInfo = (MapTypeInfo) columnType;
+    ListTypeInfo keyListTypeInfo = new ListTypeInfo();
+    keyListTypeInfo.setListElementTypeInfo(mapTypeInfo.getMapKeyTypeInfo());
+    ListTypeInfo valueListTypeInfo = new ListTypeInfo();
+    valueListTypeInfo.setListElementTypeInfo(mapTypeInfo.getMapValueTypeInfo());
+
+    // initialize 2 ListColumnVector for keys and values
+    ListColumnVector keyListColumnVector = new ListColumnVector();
+    ListColumnVector valueListColumnVector = new ListColumnVector();
+    // read the keys and values
+    keyColumnReader.readBatch(total, keyListColumnVector, keyListTypeInfo);
+    valueColumnReader.readBatch(total, valueListColumnVector, valueListTypeInfo);
+
+    // set the related attributes according to the keys and values
+    mapColumnVector.keys = keyListColumnVector.child;
+    mapColumnVector.values = valueListColumnVector.child;
+    mapColumnVector.isNull = keyListColumnVector.isNull;
+    mapColumnVector.offsets = keyListColumnVector.offsets;
+    mapColumnVector.lengths = keyListColumnVector.lengths;
+    mapColumnVector.childCount = keyListColumnVector.childCount;
+    mapColumnVector.isRepeating = keyListColumnVector.isRepeating
+        && valueListColumnVector.isRepeating;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 941bd7d..4303ca9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -503,6 +503,18 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       return new VectorizedListColumnReader(descriptors.get(0),
           pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
     case MAP:
+      if (columnDescriptors == null || columnDescriptors.isEmpty()) {
+        throw new RuntimeException(
+            "Failed to find related Parquet column descriptor with type " + type);
+      }
+      List<Type> kvTypes = type.asGroupType().getFields();
+      VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader(
+          descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion,
+          kvTypes.get(0));
+      VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader(
+          descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion,
+          kvTypes.get(1));
+      return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader);
     case UNION:
     default:
       throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name());

http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
new file mode 100644
index 0000000..c33e8ab
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBase {
+
+  protected static void writeMapData(ParquetWriter<Group> writer, boolean isDictionaryEncoding,
+    int elementNum) throws IOException {
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    int mapMaxSize = 4;
+    int mapElementIndex = 0;
+    for (int i = 0; i < elementNum; i++) {
+      boolean isNull = isNull(i);
+      Group group = f.newGroup();
+
+      int mapSize = i % mapMaxSize + 1;
+      if (!isNull) {
+        for (int j = 0; j < mapSize; j++) {
+          int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex);
+          long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex);
+          double doubleValForMap = getDoubleValue(isDictionaryEncoding, mapElementIndex);
+          float floatValForMap = getFloatValue(isDictionaryEncoding, mapElementIndex);
+          Binary binaryValForMap = getBinaryValue(isDictionaryEncoding, mapElementIndex);
+          HiveDecimal hd = getDecimal(isDictionaryEncoding, mapElementIndex).setScale(2);
+          HiveDecimalWritable hdw = new HiveDecimalWritable(hd);
+          Binary decimalValForMap = Binary.fromConstantByteArray(hdw.getInternalStorage());
+          group.addGroup("map_int32").append("key", intValForMap).append("value", intValForMap);
+          group.addGroup("map_int64").append("key", longValForMap).append("value", longValForMap);
+          group.addGroup("map_double").append("key", doubleValForMap)
+              .append("value", doubleValForMap);
+          group.addGroup("map_float").append("key", floatValForMap).append("value", floatValForMap);
+          group.addGroup("map_binary").append("key", binaryValForMap)
+              .append("value", binaryValForMap);
+          group.addGroup("map_decimal").append("key", decimalValForMap)
+              .append("value", decimalValForMap);
+          mapElementIndex++;
+        }
+      }
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  protected static void writeRepeateMapData(
+    ParquetWriter<Group> writer, int elementNum, boolean isNull) throws IOException {
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    int mapMaxSize = 4;
+    for (int i = 0; i < elementNum; i++) {
+      Group group = f.newGroup();
+      if (!isNull) {
+        for (int j = 0; j < mapMaxSize; j++) {
+          group.addGroup("map_int32_for_repeat_test").append("key", j).append("value", j);
+        }
+      }
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  @Test
+  public void testMapReadLessOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1023);
+    testMapReadAllType(isDictionaryEncoding, 1023);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1023);
+    testMapReadAllType(isDictionaryEncoding, 1023);
+    removeFile();
+  }
+
+  @Test
+  public void testMapReadEqualOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1024);
+    testMapReadAllType(isDictionaryEncoding, 1024);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1024);
+    testMapReadAllType(isDictionaryEncoding, 1024);
+    removeFile();
+  }
+
+  @Test
+  public void testMapReadMoreOneBatch() throws Exception {
+    boolean isDictionaryEncoding = false;
+    removeFile();
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1025);
+    testMapReadAllType(isDictionaryEncoding, 1025);
+    removeFile();
+    isDictionaryEncoding = true;
+    writeMapData(initWriterFromFile(), isDictionaryEncoding, 1025);
+    testMapReadAllType(isDictionaryEncoding, 1025);
+    removeFile();
+  }
+
+  @Test
+  public void testRepeateMapRead() throws Exception {
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1023, false);
+    testRepeateMapRead(1023, false);
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1023, true);
+    testRepeateMapRead(1023, true);
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1024, false);
+    testRepeateMapRead(1024, false);
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1024, true);
+    testRepeateMapRead(1024, true);
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1025, false);
+    testRepeateMapRead(1025, false);
+    removeFile();
+    writeRepeateMapData(initWriterFromFile(), 1025, true);
+    testRepeateMapRead(1025, true);
+    removeFile();
+  }
+
+  private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception {
+    testMapRead(isDictionaryEncoding, "int", elementNum);
+    testMapRead(isDictionaryEncoding, "long", elementNum);
+    testMapRead(isDictionaryEncoding, "double", elementNum);
+    testMapRead(isDictionaryEncoding, "float", elementNum);
+    testMapRead(isDictionaryEncoding, "binary", elementNum);
+    testMapRead(isDictionaryEncoding, "decimal", elementNum);
+  }
+
+  private void testMapRead(boolean isDictionaryEncoding, String type,
+      int elementNum) throws Exception {
+    Configuration conf = new Configuration();
+    setTypeConfiguration(type, conf);
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader = createTestParquetReader(getSchema(type), conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int row = 0;
+    int index = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        MapColumnVector mapVector = (MapColumnVector) previous.cols[0];
+
+        //since Repeating only happens when offset length is 1.
+        assertEquals((mapVector.offsets.length == 1),mapVector.isRepeating);
+
+        for (int i = 0; i < mapVector.offsets.length; i++) {
+          if (row == elementNum) {
+            assertEquals(i, mapVector.offsets.length - 1);
+            break;
+          }
+          long start = mapVector.offsets[i];
+          long length = mapVector.lengths[i];
+          boolean isNull = isNull(row);
+          if (isNull) {
+            assertEquals(mapVector.isNull[i], true);
+          } else {
+            for (long j = 0; j < length; j++) {
+              assertValue(type, mapVector.keys, isDictionaryEncoding, index, (int) (start + j));
+              assertValue(type, mapVector.values, isDictionaryEncoding, index, (int) (start + j));
+              index++;
+            }
+          }
+          row++;
+        }
+      }
+      assertEquals("It doesn't exit at expected position", elementNum, row);
+    } finally {
+      reader.close();
+    }
+  }
+
+  private void testRepeateMapRead(int elementNum, boolean isNull) throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS, "map_int32_for_repeat_test");
+    conf.set(IOConstants.COLUMNS_TYPES, "map<int,int>");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    String schema = "message hive_schema {\n"
+        + "  repeated group map_int32_for_repeat_test (MAP_KEY_VALUE) {\n"
+        + "    required int32 key;\n"
+        + "    optional int32 value;\n"
+        + "  }\n"
+        + "}\n";
+    VectorizedParquetRecordReader reader = createTestParquetReader(schema, conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int row = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        MapColumnVector mapVector = (MapColumnVector) previous.cols[0];
+
+        assertTrue(mapVector.isRepeating);
+        assertEquals(isNull, mapVector.isNull[0]);
+
+        for (int i = 0; i < mapVector.offsets.length; i++) {
+          if (row == elementNum) {
+            assertEquals(i, mapVector.offsets.length - 1);
+            break;
+          }
+          row++;
+        }
+      }
+      assertEquals("It doesn't exit at expected position", elementNum, row);
+    } finally {
+      reader.close();
+    }
+  }
+
+  private void setTypeConfiguration(String type, Configuration conf) {
+    if ("int".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_int32");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<int,int>");
+    } else if ("long".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_int64");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<bigint,bigint>");
+    } else if ("double".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_double");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<double,double>");
+    } else if ("float".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_float");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<float,float>");
+    } else if ("binary".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_binary");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<string,string>");
+    } else if ("decimal".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_decimal");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<decimal(5,2),decimal(5,2)>");
+    }
+  }
+
+  private String getSchema(String type) {
+    String schemaFormat = "message hive_schema {\n"
+        + "  repeated group map_%s (MAP_KEY_VALUE) {\n"
+        + "    required %s key %s;\n"
+        + "    optional %s value %s;\n"
+        + "  }\n"
+        + "}\n";
+    switch (type){
+      case "int":
+        return String.format(schemaFormat, "int32", "int32", "", "int32", "");
+      case "long":
+        return String.format(schemaFormat, "int64", "int64", "", "int64", "");
+      case "double":
+        return String.format(schemaFormat, "double", "double", "", "double", "");
+      case "float":
+        return String.format(schemaFormat, "float", "float", "", "float", "");
+      case "binary":
+        return String.format(schemaFormat, "binary", "binary", "", "binary", "");
+      case "decimal":
+        return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))",
+            "binary", "(DECIMAL(5,2))");
+      default:
+        throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!");
+    }
+  }
+
+  private void assertValue(String type, ColumnVector childVector, boolean isDictionaryEncoding,
+    int valueIndex, int position) {
+    if ("int".equals(type)) {
+      assertEquals(getIntValue(isDictionaryEncoding, valueIndex),
+          ((LongColumnVector)childVector).vector[position]);
+    } else if ("long".equals(type)) {
+      assertEquals(getLongValue(isDictionaryEncoding, valueIndex),
+          ((LongColumnVector)childVector).vector[position]);
+    } else if ("double".equals(type)) {
+      assertEquals(getDoubleValue(isDictionaryEncoding, valueIndex),
+          ((DoubleColumnVector)childVector).vector[position], 0);
+    } else if ("float".equals(type)) {
+      assertEquals(getFloatValue(isDictionaryEncoding, valueIndex),
+          ((DoubleColumnVector)childVector).vector[position], 0);
+    } else if ("binary".equals(type)) {
+      String actual = new String(ArrayUtils
+          .subarray(((BytesColumnVector)childVector).vector[position],
+              ((BytesColumnVector)childVector).start[position],
+              ((BytesColumnVector)childVector).start[position]
+                  + ((BytesColumnVector)childVector).length[position]));
+      assertEquals(getStr(isDictionaryEncoding, valueIndex), actual);
+    } else if ("decimal".equals(type)) {
+      assertEquals(getDecimal(isDictionaryEncoding, valueIndex),
+          ((DecimalColumnVector)childVector).vector[position].getHiveDecimal());
+    } else {
+      throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2efb7d38/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
index 929e991..33c5c82 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
@@ -125,6 +125,34 @@ public class VectorizedColumnReaderTestBase {
       + "repeated binary list_binary_field;"
       + "repeated binary list_decimal_field (DECIMAL(5,2));"
       + "repeated int32 list_int32_field_for_repeat_test;"
+      + "repeated group map_int32 (MAP_KEY_VALUE) {\n"
+      + "  required int32 key;\n"
+      + "  optional int32 value;\n"
+      + "}\n"
+      + "repeated group map_int64 (MAP_KEY_VALUE) {\n"
+      + "  required int64 key;\n"
+      + "  optional int64 value;\n"
+      + "}\n"
+      + "repeated group map_double (MAP_KEY_VALUE) {\n"
+      + "  required double key;\n"
+      + "  optional double value;\n"
+      + "}\n"
+      + "repeated group map_float (MAP_KEY_VALUE) {\n"
+      + "  required float key;\n"
+      + "  optional float value;\n"
+      + "}\n"
+      + "repeated group map_binary (MAP_KEY_VALUE) {\n"
+      + "  required binary key;\n"
+      + "  optional binary value;\n"
+      + "}\n"
+      + "repeated group map_decimal (MAP_KEY_VALUE) {\n"
+      + "  required binary key (DECIMAL(5,2));\n"
+      + "  optional binary value (DECIMAL(5,2));\n"
+      + "}\n"
+      + "repeated group map_int32_for_repeat_test (MAP_KEY_VALUE) {\n"
+      + "  required int32 key;\n"
+      + "  optional int32 value;\n"
+      + "}\n"
       + "} ");
 
   protected static void removeFile() throws IOException {