You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/19 20:06:17 UTC

svn commit: r1640615 - in /hive/trunk: data/files/ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ ql/src/test/org/apache/hadoop/h...

Author: brock
Date: Wed Nov 19 19:06:17 2014
New Revision: 1640615

URL: http://svn.apache.org/r1640615
Log:
HIVE-8359 - Map containing null values are not correctly written in Parquet files (Sergio Peña via Brock)

Added:
    hive/trunk/data/files/parquet_array_null_element.txt
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
    hive/trunk/ql/src/test/queries/clientpositive/parquet_array_null_element.q
    hive/trunk/ql/src/test/queries/clientpositive/parquet_map_null.q
    hive/trunk/ql/src/test/results/clientpositive/parquet_array_null_element.q.out
    hive/trunk/ql/src/test/results/clientpositive/parquet_map_null.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java

Added: hive/trunk/data/files/parquet_array_null_element.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/parquet_array_null_element.txt?rev=1640615&view=auto
==============================================================================
--- hive/trunk/data/files/parquet_array_null_element.txt (added)
+++ hive/trunk/data/files/parquet_array_null_element.txt Wed Nov 19 19:06:17 2014
@@ -0,0 +1,3 @@
+1|,7|CARRELAGE,MOQUETTE|key11:value11,key12:value12,key13:value13
+2|,|CAILLEBOTIS,|
+3|,42,||key11:value11,key12:,key13:

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1640615&r1=1640614&r2=1640615&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Wed Nov 19 19:06:17 2014
@@ -54,6 +54,7 @@ public class ArrayWritableGroupConverter
     if (isMap) {
       mapPairContainer = new Writable[2];
     }
+    currentValue = null;
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1640615&r1=1640614&r2=1640615&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Wed Nov 19 19:06:17 2014
@@ -229,14 +229,11 @@ public class ParquetHiveSerDe extends Ab
     final List<Writable> array = new ArrayList<Writable>();
     if (sourceArray != null) {
       for (final Object curObj : sourceArray) {
-        final Writable newObj = createObject(curObj, subInspector);
-        if (newObj != null) {
-          array.add(newObj);
-        }
+        array.add(createObject(curObj, subInspector));
       }
     }
     if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(array.get(0).getClass(),
+      final ArrayWritable subArray = new ArrayWritable(Writable.class,
           array.toArray(new Writable[array.size()]));
       return new ArrayWritable(Writable.class, new Writable[] {subArray});
     } else {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1640615&r1=1640614&r2=1640615&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Wed Nov 19 19:06:17 2014
@@ -15,6 +15,8 @@ package org.apache.hadoop.hive.ql.io.par
 
 import java.sql.Timestamp;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -30,10 +32,10 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import parquet.io.ParquetEncodingException;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.GroupType;
+import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
 /**
@@ -41,10 +43,10 @@ import parquet.schema.Type;
  * DataWritableWriter is a writer,
  * that will read an ArrayWritable and give the data to parquet
  * with the expected schema
- *
+ * This is a helper class used by DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
-
+  private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
   private final RecordConsumer recordConsumer;
   private final GroupType schema;
 
@@ -53,85 +55,156 @@ public class DataWritableWriter {
     this.schema = schema;
   }
 
-  public void write(final ArrayWritable arr) {
-    if (arr == null) {
-      return;
+  /**
+   * It writes all record values to the Parquet RecordConsumer.
+   * @param record Contains the record of values that are going to be written
+   */
+  public void write(final ArrayWritable record) {
+    if (record != null) {
+      recordConsumer.startMessage();
+      try {
+        writeGroupFields(record, schema);
+      } catch (RuntimeException e) {
+        String errorMessage = "Parquet record is malformed: " + e.getMessage();
+        LOG.error(errorMessage);
+        throw new RuntimeException(errorMessage);
+      }
+      recordConsumer.endMessage();
     }
-    recordConsumer.startMessage();
-    writeData(arr, schema);
-    recordConsumer.endMessage();
   }
 
-  private void writeData(final ArrayWritable arr, final GroupType type) {
-    if (arr == null) {
-      return;
+  /**
+   * It writes all the fields contained inside a group to the RecordConsumer.
+   * @param value The list of values contained in the group.
+   * @param type Type that contains information about the group schema.
+   */
+  public void writeGroupFields(final ArrayWritable value, final GroupType type) {
+    if (value != null) {
+      for (int i = 0; i < type.getFieldCount(); i++) {
+        Type fieldType = type.getType(i);
+        String fieldName = fieldType.getName();
+        Writable fieldValue = value.get()[i];
+
+        // Parquet does not write null elements
+        if (fieldValue != null) {
+          recordConsumer.startField(fieldName, i);
+          writeValue(fieldValue, fieldType);
+          recordConsumer.endField(fieldName, i);
+        }
+      }
     }
-    final int fieldCount = type.getFieldCount();
-    Writable[] values = arr.get();
-    for (int field = 0; field < fieldCount; ++field) {
-      final Type fieldType = type.getType(field);
-      final String fieldName = fieldType.getName();
-      final Writable value = values[field];
-      if (value == null) {
-        continue;
+  }
+
+  /**
+   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes
+   * the correct write function.
+   * @param value The writable object that contains the value.
+   * @param type Type that contains information about the type schema.
+   */
+  private void writeValue(final Writable value, final Type type) {
+    if (type.isPrimitive()) {
+      writePrimitive(value);
+    } else if (value instanceof ArrayWritable) {
+      GroupType groupType = type.asGroupType();
+      OriginalType originalType = type.getOriginalType();
+
+      if (originalType != null && originalType.equals(OriginalType.LIST)) {
+        writeArray((ArrayWritable)value, groupType);
+      } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
+        writeMap((ArrayWritable)value, groupType);
+      } else {
+        writeGroup((ArrayWritable) value, groupType);
       }
+    } else {
+      throw new RuntimeException("Field value is not an ArrayWritable object: " + type);
+    }
+  }
 
-      recordConsumer.startField(fieldName, field);
+  /**
+   * It writes a group type and all its values to the Parquet RecordConsumer.
+   * This is used only for optional and required groups.
+   * @param value ArrayWritable object that contains the group values
+   * @param type Type that contains information about the group schema
+   */
+  private void writeGroup(final ArrayWritable value, final GroupType type) {
+    recordConsumer.startGroup();
+    writeGroupFields(value, type);
+    recordConsumer.endGroup();
+  }
 
-      if (fieldType.isPrimitive()) {
-        writePrimitive(value);
-      } else {
-        recordConsumer.startGroup();
-        if (value instanceof ArrayWritable) {
-          if (fieldType.asGroupType().getRepetition().equals(Type.Repetition.REPEATED)) {
-            writeArray((ArrayWritable) value, fieldType.asGroupType());
-          } else {
-            writeData((ArrayWritable) value, fieldType.asGroupType());
-          }
-        } else if (value != null) {
-          throw new ParquetEncodingException("This should be an ArrayWritable or MapWritable: " + value);
+  /**
+   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
+   * This is called when the original type (MAP) is detected by writeValue()
+   * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type
+   * @param type Type that contains information about the group schema
+   */
+  private void writeMap(final ArrayWritable value, final GroupType type) {
+    GroupType repeatedType = type.getType(0).asGroupType();
+    ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
+
+    recordConsumer.startGroup();
+    recordConsumer.startField(repeatedType.getName(), 0);
+
+    Writable[] map_values = repeatedValue.get();
+    for (int record = 0; record < map_values.length; record++) {
+      Writable key_value_pair = map_values[record];
+      if (key_value_pair != null) {
+        // Hive wraps a map key-pair into an ArrayWritable
+        if (key_value_pair instanceof ArrayWritable) {
+          writeGroup((ArrayWritable)key_value_pair, repeatedType);
+        } else {
+          throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record);
         }
-
-        recordConsumer.endGroup();
+      } else {
+        throw new RuntimeException("Map key-value pair is null on record " + record);
       }
-
-      recordConsumer.endField(fieldName, field);
     }
+
+    recordConsumer.endField(repeatedType.getName(), 0);
+    recordConsumer.endGroup();
   }
 
+  /**
+   * It writes a list type and its array elements to the Parquet RecordConsumer.
+   * This is called when the original type (LIST) is detected by writeValue()
+   * @param array The list of array values that contains the repeated array group type
+   * @param type Type that contains information about the group schema
+   */
   private void writeArray(final ArrayWritable array, final GroupType type) {
-    if (array == null) {
-      return;
-    }
-    final Writable[] subValues = array.get();
-    final int fieldCount = type.getFieldCount();
-    for (int field = 0; field < fieldCount; ++field) {
-      final Type subType = type.getType(field);
-      recordConsumer.startField(subType.getName(), field);
-      for (int i = 0; i < subValues.length; ++i) {
-        final Writable subValue = subValues[i];
-        if (subValue != null) {
-          if (subType.isPrimitive()) {
-            if (subValue instanceof ArrayWritable) {
-              writePrimitive(((ArrayWritable) subValue).get()[field]);// 0 ?
-            } else {
-              writePrimitive(subValue);
-            }
-          } else {
-            if (!(subValue instanceof ArrayWritable)) {
-              throw new RuntimeException("This should be a ArrayWritable: " + subValue);
-            } else {
-              recordConsumer.startGroup();
-              writeData((ArrayWritable) subValue, subType.asGroupType());
-              recordConsumer.endGroup();
-            }
-          }
+    GroupType repeatedType = type.getType(0).asGroupType();
+    ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
+
+    recordConsumer.startGroup();
+    recordConsumer.startField(repeatedType.getName(), 0);
+
+    Writable[] array_values = repeatedValue.get();
+    for (int record = 0; record < array_values.length; record++) {
+      recordConsumer.startGroup();
+
+      // Null values must be wrapped into startGroup/endGroup
+      Writable element = array_values[record];
+      if (element != null) {
+        for (int i = 0; i < type.getFieldCount(); i++) {
+          Type fieldType = repeatedType.getType(i);
+          String fieldName = fieldType.getName();
+
+          recordConsumer.startField(fieldName, i);
+          writeValue(element, fieldType);
+          recordConsumer.endField(fieldName, i);
         }
       }
-      recordConsumer.endField(subType.getName(), field);
+
+      recordConsumer.endGroup();
     }
+
+    recordConsumer.endField(repeatedType.getName(), 0);
+    recordConsumer.endGroup();
   }
 
+  /**
+   * It writes the primitive value to the Parquet RecordConsumer.
+   * @param value The writable object that contains the primitive value.
+   */
   private void writePrimitive(final Writable value) {
     if (value == null) {
       return;

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java?rev=1640615&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java Wed Nov 19 19:06:17 2014
@@ -0,0 +1,518 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.io.UnsupportedEncodingException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestDataWritableWriter {
+  @Mock private RecordConsumer mockRecordConsumer;
+  private InOrder inOrder;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+    inOrder = inOrder(mockRecordConsumer);
+  }
+
+  private void startMessage() {
+    inOrder.verify(mockRecordConsumer).startMessage();
+  }
+
+  private void endMessage() {
+    inOrder.verify(mockRecordConsumer).endMessage();
+    verifyNoMoreInteractions(mockRecordConsumer);
+  }
+
+  private void startField(String name, int index) {
+    inOrder.verify(mockRecordConsumer).startField(name, index);
+  }
+
+  private void endField(String name, int index) {
+    inOrder.verify(mockRecordConsumer).endField(name, index);
+  }
+
+  private void addInteger(int value) {
+    inOrder.verify(mockRecordConsumer).addInteger(value);
+  }
+
+  private void addFloat(float value) {
+    inOrder.verify(mockRecordConsumer).addFloat(value);
+  }
+
+  private void addDouble(double value) {
+    inOrder.verify(mockRecordConsumer).addDouble(value);
+  }
+
+  private void addBoolean(boolean value) {
+    inOrder.verify(mockRecordConsumer).addBoolean(value);
+  }
+
+  private void addString(String value) {
+    inOrder.verify(mockRecordConsumer).addBinary(Binary.fromString(value));
+  }
+
+  private void startGroup() {
+    inOrder.verify(mockRecordConsumer).startGroup();
+  }
+
+  private void endGroup() {
+    inOrder.verify(mockRecordConsumer).endGroup();
+  }
+
+  private Writable createNull() { return null; }
+
+  private IntWritable createInt(int value) {
+    return new IntWritable(value);
+  }
+
+  private FloatWritable createFloat(float value) {
+    return new FloatWritable(value);
+  }
+
+  private DoubleWritable createDouble(double value) {
+    return new DoubleWritable(value);
+  }
+
+  private BooleanWritable createBoolean(boolean value) {
+    return new BooleanWritable(value);
+  }
+
+  private BytesWritable createString(String value) throws UnsupportedEncodingException {
+    return new BytesWritable(value.getBytes("UTF-8"));
+  }
+
+  private ArrayWritable createGroup(Writable...values) {
+    return new ArrayWritable(Writable.class, values);
+  }
+
+  private ArrayWritable createArray(Writable...values) {
+    return new ArrayWritable(Writable.class, createGroup(values).get());
+  }
+
+  private void writeParquetRecord(String schemaStr, ArrayWritable record) {
+    MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
+    DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, schema);
+    hiveParquetWriter.write(record);
+  }
+
+  @Test
+  public void testSimpleType() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional int32 int;\n"
+        + "  optional double double;\n"
+        + "  optional boolean boolean;\n"
+        + "  optional float float;\n"
+        + "  optional binary string;\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createInt(1),
+        createDouble(1.0),
+        createBoolean(true),
+        createFloat(1.0f),
+        createString("one")
+    );
+
+    // Write record to Parquet format
+    writeParquetRecord(schemaStr, hiveRecord);
+
+    // Verify record was written correctly to Parquet
+    startMessage();
+      startField("int", 0);
+        addInteger(1);
+      endField("int", 0);
+      startField("double", 1);
+        addDouble(1.0);
+      endField("double", 1);
+      startField("boolean", 2);
+        addBoolean(true);
+      endField("boolean", 2);
+      startField("float", 3);
+        addFloat(1.0f);
+      endField("float", 3);
+      startField("string", 4);
+        addString("one");
+      endField("string", 4);
+    endMessage();
+  }
+
+  @Test
+  public void testStructType() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group structCol {\n"
+        + "    optional int32 a;\n"
+        + "    optional double b;\n"
+        + "    optional boolean c;\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createInt(1),
+            createDouble(1.0),
+            createBoolean(true)
+        )
+    );
+
+    // Write record to Parquet format
+    writeParquetRecord(schemaStr, hiveRecord);
+
+    // Verify record was written correctly to Parquet
+    startMessage();
+      startField("structCol", 0);
+        startGroup();
+          startField("a", 0);
+            addInteger(1);
+          endField("a", 0);
+          startField("b", 1);
+            addDouble(1.0);
+          endField("b", 1);
+          startField("c", 2);
+            addBoolean(true);
+          endField("c", 2);
+        endGroup();
+      endField("structCol", 0);
+    endMessage();
+  }
+
+  @Test
+  public void testArrayType() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group arrayCol (LIST) {\n"
+        + "    repeated group bag {\n"
+        + "      optional int32 array_element;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createInt(1),
+                createNull(),
+                createInt(2)
+            )
+        )
+    );
+
+    // Write record to Parquet format
+    writeParquetRecord(schemaStr, hiveRecord);
+
+    // Verify record was written correctly to Parquet
+    startMessage();
+      startField("arrayCol", 0);
+        startGroup();
+          startField("bag", 0);
+            startGroup();
+              startField("array_element", 0);
+                addInteger(1);
+              endField("array_element", 0);
+            endGroup();
+            startGroup();
+            endGroup();
+            startGroup();
+            startField("array_element", 0);
+              addInteger(2);
+            endField("array_element", 0);
+            endGroup();
+          endField("bag", 0);
+        endGroup();
+      endField("arrayCol", 0);
+    endMessage();
+  }
+
+  @Test
+  public void testMapType() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group mapCol (MAP) {\n"
+        + "    repeated group map (MAP_KEY_VALUE) {\n"
+        + "      required binary key;\n"
+        + "      optional int32 value;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createArray(
+                    createString("key1"),
+                    createInt(1)
+                ),
+                createArray(
+                    createString("key2"),
+                    createInt(2)
+                ),
+                createArray(
+                    createString("key3"),
+                    createNull()
+                )
+            )
+        )
+    );
+
+    // Write record to Parquet format
+    writeParquetRecord(schemaStr, hiveRecord);
+
+    // Verify record was written correctly to Parquet
+    startMessage();
+      startField("mapCol", 0);
+        startGroup();
+          startField("map", 0);
+            startGroup();
+              startField("key", 0);
+                addString("key1");
+              endField("key", 0);
+              startField("value", 1);
+                addInteger(1);
+              endField("value", 1);
+            endGroup();
+            startGroup();
+              startField("key", 0);
+                addString("key2");
+              endField("key", 0);
+              startField("value", 1);
+                addInteger(2);
+              endField("value", 1);
+            endGroup();
+            startGroup();
+              startField("key", 0);
+                addString("key3");
+              endField("key", 0);
+            endGroup();
+          endField("map", 0);
+        endGroup();
+      endField("mapCol", 0);
+    endMessage();
+  }
+
+  @Test
+  public void testArrayOfArrays() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group array_of_arrays (LIST) {\n"
+        + "    repeated group array {\n"
+        + "      required group element (LIST) {\n"
+        + "        repeated group array {\n"
+        + "          required int32 element;\n"
+        + "        }\n"
+        + "      }\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createGroup(
+                    createArray(
+                        createInt(1),
+                        createInt(2)
+                    )
+                )
+            )
+        )
+    );
+
+    // Write record to Parquet format
+    writeParquetRecord(schemaStr, hiveRecord);
+
+    // Verify record was written correctly to Parquet
+    startMessage();
+      startField("array_of_arrays", 0);
+        startGroup();
+          startField("array", 0);
+            startGroup();
+              startField("element", 0);
+                startGroup();
+                  startField("array", 0);
+                    startGroup();
+                      startField("element", 0);
+                        addInteger(1);
+                      endField("element", 0);
+                    endGroup();
+                    startGroup();
+                      startField("element", 0);
+                        addInteger(2);
+                      endField("element", 0);
+                    endGroup();
+                  endField("array", 0);
+                endGroup();
+              endField("element", 0);
+            endGroup();
+          endField("array", 0);
+        endGroup();
+      endField("array_of_arrays", 0);
+    endMessage();
+  }
+
+  @Test
+  public void testGroupFieldIsNotArrayWritable() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group a {\n"
+        + "    optional int32 b;\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+          createInt(1)
+    );
+
+    try {
+      // Write record to Parquet format
+      writeParquetRecord(schemaStr, hiveRecord);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " +
+          "optional group a {\n  optional int32 b;\n}", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testArrayGroupElementIsNotArrayWritable() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group array_of_arrays (LIST) {\n"
+        + "    repeated group array {\n"
+        + "      required group element (LIST) {\n"
+        + "        required int32 element;\n"
+        + "      }\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createInt(1)
+            )
+        )
+    );
+
+    try {
+      // Write record to Parquet format
+      writeParquetRecord(schemaStr, hiveRecord);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " +
+          "required group element (LIST) {\n  required int32 element;\n}", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMapElementIsNotArrayWritable() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group mapCol (MAP) {\n"
+        + "    repeated group map (MAP_KEY_VALUE) {\n"
+        + "      required binary key;\n"
+        + "      optional group value {\n"
+        + "        required int32 value;"
+        + "      }\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createGroup(
+                    createString("key1"),
+                    createInt(1)
+                )
+            )
+        )
+    );
+
+    try {
+      // Write record to Parquet format
+      writeParquetRecord(schemaStr, hiveRecord);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals(
+          "Parquet record is malformed: Field value is not an ArrayWritable object: " +
+              "optional group value {\n  required int32 value;\n}", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMapKeyValueIsNotArrayWritable() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group mapCol (MAP) {\n"
+        + "    repeated group map (MAP_KEY_VALUE) {\n"
+        + "      required binary key;\n"
+        + "      optional int32 value;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createString("key1"),
+                createInt(1)
+            )
+        )
+    );
+
+    try {
+      // Write record to Parquet format
+      writeParquetRecord(schemaStr, hiveRecord);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals("Parquet record is malformed: Map key-value pair is not an ArrayWritable object on record 0", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMapKeyValueIsNull() throws Exception {
+    String schemaStr = "message hive_schema {\n"
+        + "  optional group mapCol (MAP) {\n"
+        + "    repeated group map (MAP_KEY_VALUE) {\n"
+        + "      required binary key;\n"
+        + "      optional int32 value;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+
+    ArrayWritable hiveRecord = createGroup(
+        createGroup(
+            createArray(
+                createNull()
+            )
+        )
+    );
+
+    try {
+      // Write record to Parquet format
+      writeParquetRecord(schemaStr, hiveRecord);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals("Parquet record is malformed: Map key-value pair is null on record 0", e.getMessage());
+    }
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/parquet_array_null_element.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/parquet_array_null_element.q?rev=1640615&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/parquet_array_null_element.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/parquet_array_null_element.q Wed Nov 19 19:06:17 2014
@@ -0,0 +1,33 @@
+DROP TABLE parquet_array_null_element_staging;
+DROP TABLE parquet_array_null_element;
+
+CREATE TABLE parquet_array_null_element_staging (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':'
+NULL DEFINED AS '';
+
+CREATE TABLE parquet_array_null_element (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) STORED AS PARQUET;
+
+DESCRIBE FORMATTED parquet_array_null_element;
+
+LOAD DATA LOCAL INPATH '../../data/files/parquet_array_null_element.txt' OVERWRITE INTO TABLE parquet_array_null_element_staging;
+
+SELECT * FROM parquet_array_null_element_staging;
+
+INSERT OVERWRITE TABLE parquet_array_null_element SELECT * FROM parquet_array_null_element_staging;
+
+SELECT lstint from parquet_array_null_element;
+SELECT lststr from parquet_array_null_element;
+SELECT mp from parquet_array_null_element;
+SELECT * FROM parquet_array_null_element;

Added: hive/trunk/ql/src/test/queries/clientpositive/parquet_map_null.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/parquet_map_null.q?rev=1640615&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/parquet_map_null.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/parquet_map_null.q Wed Nov 19 19:06:17 2014
@@ -0,0 +1,13 @@
+-- This test attempts to write a parquet table from an avro table that contains map null values
+
+DROP TABLE IF EXISTS avro_table;
+DROP TABLE IF EXISTS parquet_table;
+
+CREATE TABLE avro_table (avreau_col_1 map<string,string>) STORED AS AVRO;
+LOAD DATA LOCAL INPATH '../../data/files/map_null_val.avro' OVERWRITE INTO TABLE avro_table;
+
+CREATE TABLE parquet_table STORED AS PARQUET AS SELECT * FROM avro_table;
+SELECT * FROM parquet_table;
+
+DROP TABLE avro_table;
+DROP TABLE parquet_table;
\ No newline at end of file

Added: hive/trunk/ql/src/test/results/clientpositive/parquet_array_null_element.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/parquet_array_null_element.q.out?rev=1640615&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/parquet_array_null_element.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/parquet_array_null_element.q.out Wed Nov 19 19:06:17 2014
@@ -0,0 +1,160 @@
+PREHOOK: query: DROP TABLE parquet_array_null_element_staging
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE parquet_array_null_element_staging
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE parquet_array_null_element
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE parquet_array_null_element
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE parquet_array_null_element_staging (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':'
+NULL DEFINED AS ''
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_array_null_element_staging
+POSTHOOK: query: CREATE TABLE parquet_array_null_element_staging (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':'
+NULL DEFINED AS ''
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_array_null_element_staging
+PREHOOK: query: CREATE TABLE parquet_array_null_element (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_array_null_element
+POSTHOOK: query: CREATE TABLE parquet_array_null_element (
+    id int,
+    lstint ARRAY<INT>,
+    lststr ARRAY<STRING>,
+    mp  MAP<STRING,STRING>
+) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_array_null_element
+PREHOOK: query: DESCRIBE FORMATTED parquet_array_null_element
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@parquet_array_null_element
+POSTHOOK: query: DESCRIBE FORMATTED parquet_array_null_element
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@parquet_array_null_element
+# col_name            	data_type           	comment             
+	 	 
+id                  	int                 	                    
+lstint              	array<int>          	                    
+lststr              	array<string>       	                    
+mp                  	map<string,string>  	                    
+	 	 
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Protect Mode:       	None                	 
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	MANAGED_TABLE       	 
+Table Parameters:	 	 
+#### A masked pattern was here ####
+	 	 
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe	 
+InputFormat:        	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_array_null_element.txt' OVERWRITE INTO TABLE parquet_array_null_element_staging
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@parquet_array_null_element_staging
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_array_null_element.txt' OVERWRITE INTO TABLE parquet_array_null_element_staging
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@parquet_array_null_element_staging
+PREHOOK: query: SELECT * FROM parquet_array_null_element_staging
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element_staging
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM parquet_array_null_element_staging
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element_staging
+#### A masked pattern was here ####
+1	[null,7]	["CARRELAGE","MOQUETTE"]	{"key11":"value11","key12":"value12","key13":"value13"}
+2	[null,null]	["CAILLEBOTIS",null]	NULL
+3	[null,42,null]	NULL	{"key11":"value11","key12":null,"key13":null}
+PREHOOK: query: INSERT OVERWRITE TABLE parquet_array_null_element SELECT * FROM parquet_array_null_element_staging
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element_staging
+PREHOOK: Output: default@parquet_array_null_element
+POSTHOOK: query: INSERT OVERWRITE TABLE parquet_array_null_element SELECT * FROM parquet_array_null_element_staging
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element_staging
+POSTHOOK: Output: default@parquet_array_null_element
+POSTHOOK: Lineage: parquet_array_null_element.id SIMPLE [(parquet_array_null_element_staging)parquet_array_null_element_staging.FieldSchema(name:id, type:int, comment:null), ]
+POSTHOOK: Lineage: parquet_array_null_element.lstint SIMPLE [(parquet_array_null_element_staging)parquet_array_null_element_staging.FieldSchema(name:lstint, type:array<int>, comment:null), ]
+POSTHOOK: Lineage: parquet_array_null_element.lststr SIMPLE [(parquet_array_null_element_staging)parquet_array_null_element_staging.FieldSchema(name:lststr, type:array<string>, comment:null), ]
+POSTHOOK: Lineage: parquet_array_null_element.mp SIMPLE [(parquet_array_null_element_staging)parquet_array_null_element_staging.FieldSchema(name:mp, type:map<string,string>, comment:null), ]
+PREHOOK: query: SELECT lstint from parquet_array_null_element
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT lstint from parquet_array_null_element
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+[null,7]
+[null,null]
+[null,42,null]
+PREHOOK: query: SELECT lststr from parquet_array_null_element
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT lststr from parquet_array_null_element
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+["CARRELAGE","MOQUETTE"]
+["CAILLEBOTIS",null]
+NULL
+PREHOOK: query: SELECT mp from parquet_array_null_element
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT mp from parquet_array_null_element
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+{"key12":"value12","key11":"value11","key13":"value13"}
+NULL
+{"key12":null,"key11":"value11","key13":null}
+PREHOOK: query: SELECT * FROM parquet_array_null_element
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM parquet_array_null_element
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_array_null_element
+#### A masked pattern was here ####
+1	[null,7]	["CARRELAGE","MOQUETTE"]	{"key12":"value12","key11":"value11","key13":"value13"}
+2	[null,null]	["CAILLEBOTIS",null]	NULL
+3	[null,42,null]	NULL	{"key12":null,"key11":"value11","key13":null}

Added: hive/trunk/ql/src/test/results/clientpositive/parquet_map_null.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/parquet_map_null.q.out?rev=1640615&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/parquet_map_null.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/parquet_map_null.q.out Wed Nov 19 19:06:17 2014
@@ -0,0 +1,67 @@
+PREHOOK: query: -- This test attempts to write a parquet table from an avro table that contains map null values
+
+DROP TABLE IF EXISTS avro_table
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- This test attempts to write a parquet table from an avro table that contains map null values
+
+DROP TABLE IF EXISTS avro_table
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS parquet_table
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS parquet_table
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE avro_table (avreau_col_1 map<string,string>) STORED AS AVRO
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@avro_table
+POSTHOOK: query: CREATE TABLE avro_table (avreau_col_1 map<string,string>) STORED AS AVRO
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@avro_table
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/map_null_val.avro' OVERWRITE INTO TABLE avro_table
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@avro_table
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/map_null_val.avro' OVERWRITE INTO TABLE avro_table
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@avro_table
+PREHOOK: query: CREATE TABLE parquet_table STORED AS PARQUET AS SELECT * FROM avro_table
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@avro_table
+PREHOOK: Output: database:default
+PREHOOK: Output: default@parquet_table
+POSTHOOK: query: CREATE TABLE parquet_table STORED AS PARQUET AS SELECT * FROM avro_table
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@avro_table
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@parquet_table
+PREHOOK: query: SELECT * FROM parquet_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@parquet_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM parquet_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@parquet_table
+#### A masked pattern was here ####
+{"key3":"val3","key4":null}
+{"key3":"val3","key4":null}
+{"key1":null,"key2":"val2"}
+{"key3":"val3","key4":null}
+{"key3":"val3","key4":null}
+PREHOOK: query: DROP TABLE avro_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@avro_table
+PREHOOK: Output: default@avro_table
+POSTHOOK: query: DROP TABLE avro_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@avro_table
+POSTHOOK: Output: default@avro_table
+PREHOOK: query: DROP TABLE parquet_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@parquet_table
+PREHOOK: Output: default@parquet_table
+POSTHOOK: query: DROP TABLE parquet_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@parquet_table
+POSTHOOK: Output: default@parquet_table