You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/02 16:23:05 UTC

[GitHub] [iceberg] marton-bod commented on a change in pull request #1854: Hive: Implement Deserializer for Hive writes

marton-bod commented on a change in pull request #1854:
URL: https://github.com/apache/iceberg/pull/1854#discussion_r534249250



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class Deserializer {
+  private FiledDeserializer mainDeserializer;
+
+  Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException {
+    this.mainDeserializer = deserializer(schema.asStruct(), fieldInspector);
+  }
+
+  Record deserialize(Object data) {
+    return (Record) mainDeserializer.value(data);
+  }
+
+  private interface FiledDeserializer {

Review comment:
       did you mean `FieldDeserializer` here?

##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Arrays;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestDeserializer {
+  private static final Schema CUSTOMER_SCHEMA = new Schema(
+      optional(1, "customer_id", Types.LongType.get()),
+      optional(2, "first_name", Types.StringType.get())
+  );
+
+  private static final StandardStructObjectInspector CUSTOMER_OBJECT_INSPECTOR =
+      ObjectInspectorFactory.getStandardStructObjectInspector(
+          Arrays.asList("customer_id", "first_name"),
+          Arrays.asList(
+              PrimitiveObjectInspectorFactory.writableLongObjectInspector,
+              PrimitiveObjectInspectorFactory.writableStringObjectInspector
+          ));
+
+

Review comment:
       Can we add a test case where a `SerdeException` should be thrown?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class Deserializer {
+  private FiledDeserializer mainDeserializer;

Review comment:
       nit: unless we have other deserializers, I would just name this field `deserializer` or `fieldDeserializer`

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
##########
@@ -22,26 +22,38 @@
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
+import java.time.ZoneId;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 public abstract class IcebergTimestampObjectInspector extends AbstractPrimitiveJavaObjectInspector
-                                                      implements TimestampObjectInspector {
+    implements TimestampObjectInspector, IcebergWriteObjectInspector {
 
   private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE = new IcebergTimestampObjectInspector() {
     @Override
     LocalDateTime toLocalDateTime(Object o) {
       return ((OffsetDateTime) o).toLocalDateTime();
     }
+
+    @Override
+    public Object getIcebergObject(Object o) {

Review comment:
       Can't we return an `OffsetDateTime` object?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class Deserializer {
+  private FiledDeserializer mainDeserializer;
+
+  Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException {
+    this.mainDeserializer = deserializer(schema.asStruct(), fieldInspector);
+  }
+
+  Record deserialize(Object data) {
+    return (Record) mainDeserializer.value(data);
+  }
+
+  private interface FiledDeserializer {
+    Object value(Object object);
+  }
+
+  private static FiledDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return o -> ((BooleanObjectInspector) fieldInspector).get(o);
+      case INTEGER:
+        return o -> ((IntObjectInspector) fieldInspector).get(o);
+      case LONG:
+        return o -> ((LongObjectInspector) fieldInspector).get(o);
+      case FLOAT:
+        return o -> ((FloatObjectInspector) fieldInspector).get(o);
+      case DOUBLE:
+        return o -> ((DoubleObjectInspector) fieldInspector).get(o);
+      case STRING:
+        return o -> ((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o);
+      case UUID:
+        // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID

Review comment:
       Is there a way to handle the Parquet case? Just wondering what happens if someone tries to write UUID data with parquet

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergWriteObjectInspector;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class Deserializer {
+  private FiledDeserializer mainDeserializer;
+
+  Deserializer(Schema schema, ObjectInspector fieldInspector) throws SerDeException {
+    this.mainDeserializer = deserializer(schema.asStruct(), fieldInspector);
+  }
+
+  Record deserialize(Object data) {
+    return (Record) mainDeserializer.value(data);
+  }
+
+  private interface FiledDeserializer {
+    Object value(Object object);
+  }
+
+  private static FiledDeserializer deserializer(Type type, ObjectInspector fieldInspector) throws SerDeException {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return o -> ((BooleanObjectInspector) fieldInspector).get(o);
+      case INTEGER:
+        return o -> ((IntObjectInspector) fieldInspector).get(o);
+      case LONG:
+        return o -> ((LongObjectInspector) fieldInspector).get(o);
+      case FLOAT:
+        return o -> ((FloatObjectInspector) fieldInspector).get(o);
+      case DOUBLE:
+        return o -> ((DoubleObjectInspector) fieldInspector).get(o);
+      case STRING:
+        return o -> ((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o);
+      case UUID:
+        // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID
+        return o -> UUID.fromString(((StringObjectInspector) fieldInspector).getPrimitiveJavaObject(o));
+      case DATE:
+      case TIMESTAMP:
+      case FIXED:
+      case BINARY:
+      case DECIMAL:
+        // Iceberg specific conversions
+        return o -> ((IcebergWriteObjectInspector) fieldInspector).getIcebergObject(o);
+      case STRUCT:
+        return new StructDeserializer((Types.StructType) type, (StructObjectInspector) fieldInspector);
+      case LIST:
+      case MAP:
+      case TIME:
+      default:
+        throw new SerDeException("Unsupported column type: " + type);
+    }
+  }
+
+  private static class StructDeserializer implements FiledDeserializer {
+    private final FiledDeserializer[] filedDeserializers;
+    private final StructObjectInspector fieldInspector;
+    private final Types.StructType type;
+
+    private StructDeserializer(Types.StructType type, StructObjectInspector fieldInspector) throws SerDeException {
+      List<? extends StructField> structFields = fieldInspector.getAllStructFieldRefs();
+      List<Types.NestedField> nestedFields = type.fields();
+      this.filedDeserializers = new FiledDeserializer[structFields.size()];
+      this.fieldInspector = fieldInspector;
+      this.type = type;
+
+      for (int i = 0; i < filedDeserializers.length; i++) {
+        filedDeserializers[i] =
+            deserializer(nestedFields.get(i).type(), structFields.get(i).getFieldObjectInspector());
+      }
+    }
+
+    @Override
+    public Record value(Object object) {
+      if (object == null) {
+        return null;
+      }
+
+      List<Object> data = fieldInspector.getStructFieldsDataAsList(object);
+      Record result = GenericRecord.create(type);
+
+      for (int i = 0; i < filedDeserializers.length; i++) {
+        Object fieldValue = data.get(i);
+        if (fieldValue != null) {

Review comment:
       if the fieldValue is null, shouldn't we still set the null in the result record?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org