You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2015/07/09 23:38:22 UTC

hive git commit: HIVE-11131: Get row information on DataWritableWriter once for better writing performance (Sergio Pena, reviewed by Ferdinand Xu, Dong Chen & Ryan Blue)

Repository: hive
Updated Branches:
  refs/heads/master a2dabcb8c -> 820748945


HIVE-11131: Get row information on DataWritableWriter once for better writing performance (Sergio Pena, reviewed by Ferdinand Xu, Dong Chen & Ryan Blue)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82074894
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82074894
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82074894

Branch: refs/heads/master
Commit: 8207489451debe98051898df6b1ec0833fc80bb3
Parents: a2dabcb
Author: Sergio Pena <se...@cloudera.com>
Authored: Thu Jul 9 16:37:10 2015 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Thu Jul 9 16:37:10 2015 -0500

----------------------------------------------------------------------
 .../ql/io/parquet/write/DataWritableWriter.java | 638 +++++++++++++------
 1 file changed, 426 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/82074894/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
index c195c3e..493cd36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
@@ -20,8 +20,26 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
@@ -36,87 +54,104 @@ import java.util.Map;
 
 /**
  *
- * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet
- * API with the expected schema. This class is only used through DataWritableWriteSupport class.
+ * DataWritableWriter sends a record to the Parquet API with the expected schema in order
+ * to be written to a file.
+ * This class is only used through DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
   private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
-  private final RecordConsumer recordConsumer;
+  protected final RecordConsumer recordConsumer;
   private final GroupType schema;
 
+  /* This writer will be created when writing the first row in order to get
+  information about how to inspect the record data.  */
+  private DataWriter messageWriter;
+
   public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema) {
     this.recordConsumer = recordConsumer;
     this.schema = schema;
   }
 
   /**
-   * It writes all record values to the Parquet RecordConsumer.
-   * @param record Contains the record that are going to be written.
+   * It writes a record to Parquet.
+   * @param record Contains the record that is going to be written.
    */
   public void write(final ParquetHiveRecord record) {
     if (record != null) {
-      recordConsumer.startMessage();
-      try {
-        writeGroupFields(record.getObject(), record.getObjectInspector(), schema);
-      } catch (RuntimeException e) {
-        String errorMessage = "Parquet record is malformed: " + e.getMessage();
-        LOG.error(errorMessage, e);
-        throw new RuntimeException(errorMessage, e);
+      if (messageWriter == null) {
+        try {
+          messageWriter = createMessageWriter(record.getObjectInspector(), schema);
+        } catch (RuntimeException e) {
+          String errorMessage = "Parquet record is malformed: " + e.getMessage();
+          LOG.error(errorMessage, e);
+          throw new RuntimeException(errorMessage, e);
+        }
       }
-      recordConsumer.endMessage();
+
+      messageWriter.write(record.getObject());
     }
   }
 
-  /**
-   * It writes all the fields contained inside a group to the RecordConsumer.
-   * @param value The list of values contained in the group.
-   * @param inspector The object inspector used to get the correct value type.
-   * @param type Type that contains information about the group schema.
-   */
-  private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) {
-    if (value != null) {
-      List<? extends StructField> fields = inspector.getAllStructFieldRefs();
-      List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value);
-
-      for (int i = 0; i < type.getFieldCount(); i++) {
-        Type fieldType = type.getType(i);
-        String fieldName = fieldType.getName();
-        Object fieldValue = fieldValuesList.get(i);
-
-        if (fieldValue != null) {
-          ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector();
-          recordConsumer.startField(fieldName, i);
-          writeValue(fieldValue, fieldInspector, fieldType);
-          recordConsumer.endField(fieldName, i);
-        }
-      }
-    }
+  private MessageDataWriter createMessageWriter(StructObjectInspector inspector, GroupType schema) {
+    return new MessageDataWriter(inspector, schema);
   }
 
   /**
-   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls
-   * the correct write function.
-   * @param value The writable object that contains the value.
+   * Creates a writer for the specific object inspector. The returned writer will be used
+   * to call Parquet API for the specific data type.
    * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the type schema.
+   * @return A ParquetWriter object used to call the Parquet API fo the specific data type.
    */
-  private void writeValue(final Object value, final ObjectInspector inspector, final Type type) {
+  private DataWriter createWriter(ObjectInspector inspector, Type type) {
     if (type.isPrimitive()) {
       checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
-      writePrimitive(value, (PrimitiveObjectInspector)inspector);
+      PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector)inspector;
+      switch (primitiveInspector.getPrimitiveCategory()) {
+        case BOOLEAN:
+          return new BooleanDataWriter((BooleanObjectInspector)inspector);
+        case BYTE:
+          return new ByteDataWriter((ByteObjectInspector)inspector);
+        case SHORT:
+          return new ShortDataWriter((ShortObjectInspector)inspector);
+        case INT:
+          return new IntDataWriter((IntObjectInspector)inspector);
+        case LONG:
+          return new LongDataWriter((LongObjectInspector)inspector);
+        case FLOAT:
+          return new FloatDataWriter((FloatObjectInspector)inspector);
+        case DOUBLE:
+          return new DoubleDataWriter((DoubleObjectInspector)inspector);
+        case STRING:
+          return new StringDataWriter((StringObjectInspector)inspector);
+        case CHAR:
+          return new CharDataWriter((HiveCharObjectInspector)inspector);
+        case VARCHAR:
+          return new VarcharDataWriter((HiveVarcharObjectInspector)inspector);
+        case BINARY:
+          return new BinaryDataWriter((BinaryObjectInspector)inspector);
+        case TIMESTAMP:
+          return new TimestampDataWriter((TimestampObjectInspector)inspector);
+        case DECIMAL:
+          return new DecimalDataWriter((HiveDecimalObjectInspector)inspector);
+        case DATE:
+          return new DateDataWriter((DateObjectInspector)inspector);
+        default:
+          throw new IllegalArgumentException("Unsupported primitive data type: " + primitiveInspector.getPrimitiveCategory());
+      }
     } else {
       GroupType groupType = type.asGroupType();
       OriginalType originalType = type.getOriginalType();
 
       if (originalType != null && originalType.equals(OriginalType.LIST)) {
         checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
-        writeArray(value, (ListObjectInspector)inspector, groupType);
+        return new ListDataWriter((ListObjectInspector)inspector, groupType);
       } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
         checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
-        writeMap(value, (MapObjectInspector)inspector, groupType);
+        return new MapDataWriter((MapObjectInspector)inspector, groupType);
       } else {
         checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
-        writeGroup(value, (StructObjectInspector)inspector, groupType);
+        return new StructDataWriter((StructObjectInspector)inspector, groupType);
       }
     }
   }
@@ -134,203 +169,382 @@ public class DataWritableWriter {
     }
   }
 
-  /**
-   * It writes a group type and all its values to the Parquet RecordConsumer.
-   * This is used only for optional and required groups.
-   * @param value Object that contains the group values.
-   * @param inspector The object inspector used to get the correct value type.
-   * @param type Type that contains information about the group schema.
-   */
-  private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) {
-    recordConsumer.startGroup();
-    writeGroupFields(value, inspector, type);
-    recordConsumer.endGroup();
+  private interface DataWriter {
+    void write(Object value);
   }
 
-  /**
-   * It writes a list type and its array elements to the Parquet RecordConsumer.
-   * This is called when the original type (LIST) is detected by writeValue()/
-   * This function assumes the following schema:
-   *    optional group arrayCol (LIST) {
-   *      repeated group array {
-   *        optional TYPE array_element;
-   *      }
-   *    }
-   * @param value The object that contains the array values.
-   * @param inspector The object inspector used to get the correct value type.
-   * @param type Type that contains information about the group (LIST) schema.
-   */
-  private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) {
-    // Get the internal array structure
-    GroupType repeatedType = type.getType(0).asGroupType();
+  private class GroupDataWriter implements DataWriter {
+    private StructObjectInspector inspector;
+    private List<? extends StructField> structFields;
+    private DataWriter[] structWriters;
 
-    recordConsumer.startGroup();
-    recordConsumer.startField(repeatedType.getName(), 0);
+    public GroupDataWriter(StructObjectInspector inspector, GroupType groupType) {
+      this.inspector = inspector;
 
-    List<?> arrayValues = inspector.getList(value);
-    ObjectInspector elementInspector = inspector.getListElementObjectInspector();
+      structFields = this.inspector.getAllStructFieldRefs();
+      structWriters = new DataWriter[structFields.size()];
 
-    Type elementType = repeatedType.getType(0);
-    String elementName = elementType.getName();
+      for (int i = 0; i < structFields.size(); i++) {
+        StructField field = structFields.get(i);
+        structWriters[i] = createWriter(field.getFieldObjectInspector(), groupType.getType(i));
+      }
+    }
 
-    for (Object element : arrayValues) {
-      recordConsumer.startGroup();
-      if (element != null) {
-        recordConsumer.startField(elementName, 0);
-        writeValue(element, elementInspector, elementType);
-        recordConsumer.endField(elementName, 0);
+    @Override
+    public void write(Object value) {
+      for (int i = 0; i < structFields.size(); i++) {
+        StructField field = structFields.get(i);
+        Object fieldValue = inspector.getStructFieldData(value, field);
+
+        if (fieldValue != null) {
+          String fieldName = field.getFieldName();
+          DataWriter writer = structWriters[i];
+
+          recordConsumer.startField(fieldName, i);
+          writer.write(fieldValue);
+          recordConsumer.endField(fieldName, i);
+        }
       }
-      recordConsumer.endGroup();
+    }
+  }
+
+  private class MessageDataWriter extends GroupDataWriter implements DataWriter {
+    public MessageDataWriter(StructObjectInspector inspector, GroupType groupType) {
+      super(inspector, groupType);
     }
 
-    recordConsumer.endField(repeatedType.getName(), 0);
-    recordConsumer.endGroup();
+    @Override
+    public void write(Object value) {
+      recordConsumer.startMessage();
+      if (value != null) {
+        super.write(value);
+      }
+      recordConsumer.endMessage();
+    }
   }
 
-  /**
-   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
-   * This is called when the original type (MAP) is detected by writeValue().
-   * This function assumes the following schema:
-   *    optional group mapCol (MAP) {
-   *      repeated group map (MAP_KEY_VALUE) {
-   *        required TYPE key;
-   *        optional TYPE value;
-   *      }
-   *    }
-   * @param value The object that contains the map key-values.
-   * @param inspector The object inspector used to get the correct value type.
-   * @param type Type that contains information about the group (MAP) schema.
-   */
-  private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
-    // Get the internal map structure (MAP_KEY_VALUE)
-    GroupType repeatedType = type.getType(0).asGroupType();
+  private class StructDataWriter extends GroupDataWriter implements DataWriter {
+    public StructDataWriter(StructObjectInspector inspector, GroupType groupType) {
+      super(inspector, groupType);
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.startGroup();
+      super.write(value);
+      recordConsumer.endGroup();
+    }
+  }
+
+  private class ListDataWriter implements DataWriter {
+    private ListObjectInspector inspector;
+    private String elementName;
+    private DataWriter elementWriter;
+    private String repeatedGroupName;
 
-    recordConsumer.startGroup();
-    recordConsumer.startField(repeatedType.getName(), 0);
+    public ListDataWriter(ListObjectInspector inspector, GroupType groupType) {
+      this.inspector = inspector;
+
+      // Get the internal array structure
+      GroupType repeatedType = groupType.getType(0).asGroupType();
+      this.repeatedGroupName = repeatedType.getName();
+
+      Type elementType = repeatedType.getType(0);
+      this.elementName = elementType.getName();
+
+      ObjectInspector elementInspector = this.inspector.getListElementObjectInspector();
+      this.elementWriter = createWriter(elementInspector, elementType);
+    }
 
-    Map<?, ?> mapValues = inspector.getMap(value);
+    @Override
+    public void write(Object value) {
+      recordConsumer.startGroup();
+      recordConsumer.startField(repeatedGroupName, 0);
+
+      int listLength = inspector.getListLength(value);
+      for (int i = 0; i < listLength; i++) {
+        Object element = inspector.getListElement(value, i);
+        recordConsumer.startGroup();
+        if (element != null) {
+          recordConsumer.startField(elementName, 0);
+          elementWriter.write(element);
+          recordConsumer.endField(elementName, 0);
+        }
+        recordConsumer.endGroup();
+      }
 
-    Type keyType = repeatedType.getType(0);
-    String keyName = keyType.getName();
-    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
+      recordConsumer.endField(repeatedGroupName, 0);
+      recordConsumer.endGroup();
+    }
+  }
 
-    Type valuetype = repeatedType.getType(1);
-    String valueName = valuetype.getName();
-    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+  private class MapDataWriter implements DataWriter {
+    private MapObjectInspector inspector;
+    private String repeatedGroupName;
+    private String keyName, valueName;
+    private DataWriter keyWriter, valueWriter;
+
+    public MapDataWriter(MapObjectInspector inspector, GroupType groupType) {
+      this.inspector = inspector;
+
+      // Get the internal map structure (MAP_KEY_VALUE)
+      GroupType repeatedType = groupType.getType(0).asGroupType();
+      this.repeatedGroupName = repeatedType.getName();
+
+      // Get key element information
+      Type keyType = repeatedType.getType(0);
+      ObjectInspector keyInspector = this.inspector.getMapKeyObjectInspector();
+      this.keyName = keyType.getName();
+      this.keyWriter = createWriter(keyInspector, keyType);
+
+      // Get value element information
+      Type valuetype = repeatedType.getType(1);
+      ObjectInspector valueInspector = this.inspector.getMapValueObjectInspector();
+      this.valueName = valuetype.getName();
+      this.valueWriter = createWriter(valueInspector, valuetype);
+    }
 
-    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+    @Override
+    public void write(Object value) {
       recordConsumer.startGroup();
-      if (keyValue != null) {
-        // write key element
-        Object keyElement = keyValue.getKey();
-        recordConsumer.startField(keyName, 0);
-        writeValue(keyElement, keyInspector, keyType);
-        recordConsumer.endField(keyName, 0);
-
-        // write value element
-        Object valueElement = keyValue.getValue();
-        if (valueElement != null) {
-          recordConsumer.startField(valueName, 1);
-          writeValue(valueElement, valueInspector, valuetype);
-          recordConsumer.endField(valueName, 1);
+      recordConsumer.startField(repeatedGroupName, 0);
+
+      Map<?, ?> mapValues = inspector.getMap(value);
+      for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+        recordConsumer.startGroup();
+        if (keyValue != null) {
+          // write key element
+          Object keyElement = keyValue.getKey();
+          recordConsumer.startField(keyName, 0);
+          keyWriter.write(keyElement);
+          recordConsumer.endField(keyName, 0);
+
+          // write value element
+          Object valueElement = keyValue.getValue();
+          if (valueElement != null) {
+            recordConsumer.startField(valueName, 1);
+            valueWriter.write(valueElement);
+            recordConsumer.endField(valueName, 1);
+          }
         }
+        recordConsumer.endGroup();
       }
+
+      recordConsumer.endField(repeatedGroupName, 0);
       recordConsumer.endGroup();
     }
+  }
+
+  private class BooleanDataWriter implements DataWriter {
+    private BooleanObjectInspector inspector;
+
+    public BooleanDataWriter(BooleanObjectInspector inspector) {
+      this.inspector = inspector;
+    }
 
-    recordConsumer.endField(repeatedType.getName(), 0);
-    recordConsumer.endGroup();
+    @Override
+    public void write(Object value) {
+      recordConsumer.addBoolean(inspector.get(value));
+    }
   }
 
-  /**
-   * It writes the primitive value to the Parquet RecordConsumer.
-   * @param value The object that contains the primitive value.
-   * @param inspector The object inspector used to get the correct value type.
-   */
-  private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
-    if (value == null) {
-      return;
-    }
-
-    switch (inspector.getPrimitiveCategory()) {
-      case VOID:
-        return;
-      case DOUBLE:
-        recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));
-        break;
-      case BOOLEAN:
-        recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value));
-        break;
-      case FLOAT:
-        recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
-        break;
-      case BYTE:
-        recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value));
-        break;
-      case INT:
-        recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
-        break;
-      case LONG:
-        recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
-        break;
-      case SHORT:
-        recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value));
-        break;
-      case STRING:
-        String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value);
-        recordConsumer.addBinary(Binary.fromString(v));
-        break;
-      case CHAR:
-        String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue();
-        recordConsumer.addBinary(Binary.fromString(vChar));
-        break;
-      case VARCHAR:
-        String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue();
-        recordConsumer.addBinary(Binary.fromString(vVarchar));
-        break;
-      case BINARY:
-        byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value);
-        recordConsumer.addBinary(Binary.fromByteArray(vBinary));
-        break;
-      case TIMESTAMP:
-        Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value);
-        recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
-        break;
-      case DECIMAL:
-        HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value));
-        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
-        recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
-        break;
-      case DATE:
-        Date vDate = ((DateObjectInspector) inspector).getPrimitiveJavaObject(value);
-        recordConsumer.addInteger(DateWritable.dateToDays(vDate));
-        break;
-      default:
-        throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
+  private class ByteDataWriter implements DataWriter {
+    private ByteObjectInspector inspector;
+
+    public ByteDataWriter(ByteObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addInteger(inspector.get(value));
+    }
+  }
+
+  private class ShortDataWriter implements DataWriter {
+    private ShortObjectInspector inspector;
+    public ShortDataWriter(ShortObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addInteger(inspector.get(value));
+    }
+  }
+
+  private class IntDataWriter implements DataWriter {
+    private IntObjectInspector inspector;
+
+    public IntDataWriter(IntObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addInteger(inspector.get(value));
+    }
+  }
+
+  private class LongDataWriter implements DataWriter {
+    private LongObjectInspector inspector;
+
+    public LongDataWriter(LongObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addLong(inspector.get(value));
+    }
+  }
+
+  private class FloatDataWriter implements DataWriter {
+    private FloatObjectInspector inspector;
+
+    public FloatDataWriter(FloatObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addFloat(inspector.get(value));
+    }
+  }
+
+  private class DoubleDataWriter implements DataWriter {
+    private DoubleObjectInspector inspector;
+
+    public DoubleDataWriter(DoubleObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      recordConsumer.addDouble(inspector.get(value));
+    }
+  }
+
+  private class StringDataWriter implements DataWriter {
+    private StringObjectInspector inspector;
+
+    public StringDataWriter(StringObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      String v = inspector.getPrimitiveJavaObject(value);
+      recordConsumer.addBinary(Binary.fromString(v));
     }
   }
 
-  private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) {
-    int prec = decimalTypeInfo.precision();
-    int scale = decimalTypeInfo.scale();
-    byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+  private class CharDataWriter implements DataWriter {
+    private HiveCharObjectInspector inspector;
 
-    // Estimated number of bytes needed.
-    int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
-    if (precToBytes == decimalBytes.length) {
-      // No padding needed.
-      return Binary.fromByteArray(decimalBytes);
+    public CharDataWriter(HiveCharObjectInspector inspector) {
+      this.inspector = inspector;
     }
 
-    byte[] tgt = new byte[precToBytes];
+    @Override
+    public void write(Object value) {
+      String v = inspector.getPrimitiveJavaObject(value).getStrippedValue();
+      recordConsumer.addBinary(Binary.fromString(v));
+    }
+  }
+
+  private class VarcharDataWriter implements DataWriter {
+    private HiveVarcharObjectInspector inspector;
+
+    public VarcharDataWriter(HiveVarcharObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      String v = inspector.getPrimitiveJavaObject(value).getValue();
+      recordConsumer.addBinary(Binary.fromString(v));
+    }
+  }
+
+  private class BinaryDataWriter implements DataWriter {
+    private BinaryObjectInspector inspector;
+
+    public BinaryDataWriter(BinaryObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      byte[] vBinary = inspector.getPrimitiveJavaObject(value);
+      recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+    }
+  }
+
+  private class TimestampDataWriter implements DataWriter {
+    private TimestampObjectInspector inspector;
+
+    public TimestampDataWriter(TimestampObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      Timestamp ts = inspector.getPrimitiveJavaObject(value);
+      recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
+    }
+  }
+
+  private class DecimalDataWriter implements DataWriter {
+    private HiveDecimalObjectInspector inspector;
+
+    public DecimalDataWriter(HiveDecimalObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      HiveDecimal vDecimal = inspector.getPrimitiveJavaObject(value);
+      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+      recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+    }
+
+    private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) {
+      int prec = decimalTypeInfo.precision();
+      int scale = decimalTypeInfo.scale();
+      byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+      // Estimated number of bytes needed.
+      int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+      if (precToBytes == decimalBytes.length) {
+        // No padding needed.
+        return Binary.fromByteArray(decimalBytes);
+      }
+
+      byte[] tgt = new byte[precToBytes];
       if (hiveDecimal.signum() == -1) {
-      // For negative number, initializing bits to 1
-      for (int i = 0; i < precToBytes; i++) {
-        tgt[i] |= 0xFF;
+        // For negative number, initializing bits to 1
+        for (int i = 0; i < precToBytes; i++) {
+          tgt[i] |= 0xFF;
+        }
       }
+
+      System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
+      return Binary.fromByteArray(tgt);
     }
+  }
 
-    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
-    return Binary.fromByteArray(tgt);
+  private class DateDataWriter implements DataWriter {
+    private DateObjectInspector inspector;
+
+    public DateDataWriter(DateObjectInspector inspector) {
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(Object value) {
+      Date vDate = inspector.getPrimitiveJavaObject(value);
+      recordConsumer.addInteger(DateWritable.dateToDays(vDate));
+    }
   }
-}
+}
\ No newline at end of file