You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/08/24 23:49:00 UTC

[GitHub] [incubator-gobblin] autumnust opened a new pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

autumnust opened a new pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] https://issues.apache.org/jira/browse/GOBBLIN-1250
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   This PR open sources our internally-tested OrcWriter along with a bunch of tests implemented for this. Several things to note: 
   - This writer supports Union write. 
   - There's an optimization being added for throughput, to handle nested wide schema inside write by calling `ensureSize` with more-than-needed size in advance. 
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
autumnust commented on pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#issuecomment-679423871


   @sv2000  @ZihanLi58  Please take a look, thanks !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] asfgit closed pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r485253564



##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+
+public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, GenericRecord> {
+  public GobblinOrcWriterBuilder() {
+  }
+
+  @Override
+  public DataWriter<GenericRecord> build()
+      throws IOException {
+    Preconditions.checkNotNull(this.destination);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+    Preconditions.checkNotNull(this.schema);
+
+    switch (this.destination.getType()) {
+      case HDFS:

Review comment:
       Mostly it should be compatible. users will need to be cautious on the staging directory, where the rename operation on `s3a` is not a metadata operation but triggers a data copy. So it will be more efficient to set the staging directory of writer as a local destination and publish to s3a in publisher. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r481371669



##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+
+public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, GenericRecord> {

Review comment:
       Can we add java doc for the class?

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than added

Review comment:
       Can we remove Dali2 and it's future change in the comment, seems like a internal information




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r481366460



##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than added
+ * UnionConverter implementation to support raw-ingestion for union usecase.
+ *
+ * Note that the implementation in Dali2 is likely to be changed to stay consistent with Iceberg's implementation for

Review comment:
       Same as above. Remove reference to Dali.

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+
+/**
+ * A utility class that provides a method to convert {@link Schema} into {@link TypeDescription}.
+ */
+public class AvroOrcSchemaConverter {

Review comment:
       Worth considering moving this class to gobblin-utility module. Not perfect - but may be inside AvroUtils? Alternately, OrcUtils could be moved to gobblin-utility and this class could go inside that class.

##########
File path: gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
+
+
+@Slf4j
+public class GenericRecordToOrcValueWriterTest {
+  @Test
+  public void testUnionRecordConversionWriter()
+      throws Exception {
+    Schema schema =
+        new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema);
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "union_test/data.json");
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+
+    // Flush RowBatch into disk.
+    File tempFile = new File(Files.createTempDir(), "orc");
+    tempFile.deleteOnExit();
+    Path filePath = new Path(tempFile.getAbsolutePath());
+
+    OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(), new Configuration());
+    options.setSchema(orcSchema);
+    Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+    orcFileWriter.addRowBatch(rowBatch);
+    orcFileWriter.close();
+
+    // Load it back and compare.
+    FileSystem fs = FileSystem.get(new Configuration());
+    List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+    Assert.assertEquals(orcRecords.size(), 5);
+
+    // Knowing all of them are OrcStruct<OrcUnion>, save the effort to recursively convert GenericRecord to OrcStruct
+    // for comprehensive comparison which is non-trivial,
+    // although it is also theoretically possible and optimal way for doing this unit test.
+    List<OrcUnion> unionList = orcRecords.stream().map(this::getUnionFieldFromStruct).collect(Collectors.toList());
+
+    // Constructing all OrcUnion and verify all of them appears in unionList.
+    TypeDescription unionSchema = orcSchema.getChildren().get(0);
+    OrcUnion union_0 = new OrcUnion(unionSchema);
+    union_0.set((byte) 0, new Text("urn:li:member:3"));
+    Assert.assertTrue(unionList.contains(union_0));
+
+    OrcUnion union_1 = new OrcUnion(unionSchema);
+    union_1.set((byte) 0, new Text("urn:li:member:4"));
+    Assert.assertTrue(unionList.contains(union_1));
+
+    OrcUnion union_2 = new OrcUnion(unionSchema);
+    union_2.set((byte) 1, new IntWritable(2));
+    Assert.assertTrue(unionList.contains(union_2));
+
+    OrcUnion union_3 = new OrcUnion(unionSchema);
+    union_3.set((byte) 1, new IntWritable(1));
+    Assert.assertTrue(unionList.contains(union_3));
+
+    OrcUnion union_4 = new OrcUnion(unionSchema);
+    union_4.set((byte) 1, new IntWritable(3));
+    Assert.assertTrue(unionList.contains(union_4));
+  }
+
+  @Test
+  public void testListResize()
+      throws Exception {
+    Schema schema =
+        new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema);
+    // Make the batch size very small so that the enlarge behavior would easily be triggered.
+    // But this has to more than the number of records that we deserialized form data.json, as here we don't reset batch.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Examining resize count, which should happen only once for map and list, so totally 2.
+    Assert.assertEquals(valueWriter.resizeCount, 2);
+  }
+
+  /**
+   * Accessing "fields" using reflection to work-around access modifiers.
+   */
+  private OrcUnion getUnionFieldFromStruct(Writable struct) {
+    try {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      Field objectArr = OrcStruct.class.getDeclaredField("fields");
+      objectArr.setAccessible(true);
+      return (OrcUnion) ((Object[]) objectArr.get(orcStruct))[0];
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot access with reflection", e);
+    }
+  }
+
+  public static final List<GenericRecord> deserializeAvroRecords(Class clazz, Schema schema, String schemaPath)

Review comment:
       Looks like this method can be moved into a test utils class.

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+
+public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, GenericRecord> {
+  public GobblinOrcWriterBuilder() {
+  }
+
+  @Override
+  public DataWriter<GenericRecord> build()
+      throws IOException {
+    Preconditions.checkNotNull(this.destination);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+    Preconditions.checkNotNull(this.schema);
+
+    switch (this.destination.getType()) {
+      case HDFS:

Review comment:
       Just an FYI - there have been questions on Gitter whether Gobblin supports writing in ORC format to S3. What aspects of GobblinOrcWriter can be reused for future writes to S3? 

##########
File path: gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+
+
+public class AvroOrcSchemaConverterTest {
+  @Test
+  public void testUnionORCSchemaTranslation() throws Exception {
+    Schema avroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().intType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription unionSchema = TypeDescription.createUnion()
+        .addUnionChild(TypeDescription.createString())
+        .addUnionChild(TypeDescription.createInt());
+    TypeDescription recordSchemaWithUnion = TypeDescription.createStruct().addField("test_union", unionSchema);
+
+    // Verify the schema conversion for Union works
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroUnion), recordSchemaWithUnion);
+
+    //Create a nullable union field
+    Schema nullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Assert that Orc schema has flattened the nullable union to the member's type
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nullableAvroUnion),
+        TypeDescription.createStruct().addField("test_union", TypeDescription.createString()));
+
+    //Create a non nullable union type
+    Schema nonNullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Ensure that the union type is preserved
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nonNullableAvroUnion), TypeDescription.createStruct()
+        .addField("test_union", TypeDescription.createUnion().addUnionChild(TypeDescription.createString())));
+  }
+
+  @Test
+  public void testTrivialAvroSchemaTranslation() throws Exception {
+
+    // Trivial cases
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("string_type")
+        .type(SchemaBuilder.builder().stringType())
+        .noDefault()
+        .name("int_type")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("string_type", TypeDescription.createString())
+        .addField("int_type", TypeDescription.createInt());
+
+    // Top-level record name will not be replicated in conversion result.
+    Assert.assertEquals(avroSchema.getFields(), getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testUnionAvroSchemaTranslation() throws Exception {
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("union_nested")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion())
+        .noDefault()
+        .endRecord();
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("union_nested", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createString())
+            .addUnionChild(TypeDescription.createInt()));
+
+    Assert.assertEquals(avroSchema.getFields(), getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testSchemaSanitization() throws Exception {
+
+    // Two field along with null
+    Schema avroSchema = SchemaBuilder.builder().unionOf().nullType().and().stringType().and().intType().endUnion();
+    Schema expectedSchema = SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema), expectedSchema);
+
+    // Only one field except null
+    Schema avroSchema_1 = SchemaBuilder.builder()
+        .unionOf()
+        .nullType()
+        .and()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord()
+        .endUnion();
+    expectedSchema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema_1), expectedSchema);
+  }
+
+  public static Schema getAvroSchema(TypeDescription schema) {

Review comment:
       A candidate to move into a test utils class?

##########
File path: gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
+
+
+@Slf4j
+public class GenericRecordToOrcValueWriterTest {
+  @Test
+  public void testUnionRecordConversionWriter()
+      throws Exception {
+    Schema schema =
+        new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema);
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "union_test/data.json");
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+
+    // Flush RowBatch into disk.
+    File tempFile = new File(Files.createTempDir(), "orc");
+    tempFile.deleteOnExit();
+    Path filePath = new Path(tempFile.getAbsolutePath());
+
+    OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(), new Configuration());
+    options.setSchema(orcSchema);
+    Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+    orcFileWriter.addRowBatch(rowBatch);
+    orcFileWriter.close();
+
+    // Load it back and compare.
+    FileSystem fs = FileSystem.get(new Configuration());
+    List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+    Assert.assertEquals(orcRecords.size(), 5);
+
+    // Knowing all of them are OrcStruct<OrcUnion>, save the effort to recursively convert GenericRecord to OrcStruct
+    // for comprehensive comparison which is non-trivial,
+    // although it is also theoretically possible and optimal way for doing this unit test.
+    List<OrcUnion> unionList = orcRecords.stream().map(this::getUnionFieldFromStruct).collect(Collectors.toList());
+
+    // Constructing all OrcUnion and verify all of them appears in unionList.
+    TypeDescription unionSchema = orcSchema.getChildren().get(0);
+    OrcUnion union_0 = new OrcUnion(unionSchema);
+    union_0.set((byte) 0, new Text("urn:li:member:3"));
+    Assert.assertTrue(unionList.contains(union_0));
+
+    OrcUnion union_1 = new OrcUnion(unionSchema);
+    union_1.set((byte) 0, new Text("urn:li:member:4"));
+    Assert.assertTrue(unionList.contains(union_1));
+
+    OrcUnion union_2 = new OrcUnion(unionSchema);
+    union_2.set((byte) 1, new IntWritable(2));
+    Assert.assertTrue(unionList.contains(union_2));
+
+    OrcUnion union_3 = new OrcUnion(unionSchema);
+    union_3.set((byte) 1, new IntWritable(1));
+    Assert.assertTrue(unionList.contains(union_3));
+
+    OrcUnion union_4 = new OrcUnion(unionSchema);
+    union_4.set((byte) 1, new IntWritable(3));
+    Assert.assertTrue(unionList.contains(union_4));
+  }
+
+  @Test
+  public void testListResize()
+      throws Exception {
+    Schema schema =
+        new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema);
+    // Make the batch size very small so that the enlarge behavior would easily be triggered.
+    // But this has to more than the number of records that we deserialized form data.json, as here we don't reset batch.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Examining resize count, which should happen only once for map and list, so totally 2.
+    Assert.assertEquals(valueWriter.resizeCount, 2);
+  }
+
+  /**
+   * Accessing "fields" using reflection to work-around access modifiers.
+   */
+  private OrcUnion getUnionFieldFromStruct(Writable struct) {
+    try {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      Field objectArr = OrcStruct.class.getDeclaredField("fields");
+      objectArr.setAccessible(true);
+      return (OrcUnion) ((Object[]) objectArr.get(orcStruct))[0];
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot access with reflection", e);
+    }
+  }
+
+  public static final List<GenericRecord> deserializeAvroRecords(Class clazz, Schema schema, String schemaPath)
+      throws IOException {
+    List<GenericRecord> records = new ArrayList<>();
+
+    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+
+    InputStream dataInputStream = clazz.getClassLoader().getResourceAsStream(schemaPath);
+    Decoder decoder = DecoderFactory.get().jsonDecoder(schema, dataInputStream);
+    GenericRecord recordContainer = reader.read(null, decoder);
+    ;
+    try {
+      while (recordContainer != null) {
+        records.add(recordContainer);
+        recordContainer = reader.read(null, decoder);
+      }
+    } catch (IOException ioe) {
+      dataInputStream.close();
+    }
+    return records;
+  }
+
+  public static final List<Writable> deserializeOrcRecords(Path orcFilePath, FileSystem fs)

Review comment:
       Same comment as above.

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than added

Review comment:
       Update Javadoc to remove references to Dali/Dali2.

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than added
+ * UnionConverter implementation to support raw-ingestion for union usecase.
+ *
+ * Note that the implementation in Dali2 is likely to be changed to stay consistent with Iceberg's implementation for
+ * other converter implementation. That change is unnecessary for Gobblin and justify this separated copy.
+ */
+@Slf4j
+public class GenericRecordToOrcValueWriter implements OrcValueWriter<GenericRecord> {
+  private static final String ENABLE_SMART_ARRAY_ENLARGE = GobblinOrcWriter.ORC_WRITER_PREFIX + "enabledMulValueColumnVectorSmartSizing";
+  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+  private static final String ENLARGE_FACTOR_KEY = GobblinOrcWriter.ORC_WRITER_PREFIX + "enlargeFactor";
+  private static final int DEFAULT_ENLARGE_FACTOR = 3;
+
+  private boolean enabledSmartSizing;
+  private int enlargeFactor;
+
+  // A rough measure on how much resize is triggered, helping on debugging and testing.

Review comment:
       "on how much resize is triggered" -> "of how many times resize is triggered".

##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.state.ConstructState;
+
+
+/**
+ * A wrapper for ORC-core writer without dependency on Hive SerDe library.
+ */
+@Slf4j
+public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
+  static final String ORC_WRITER_PREFIX = "orcWriter.";
+  private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
+  private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+
+  /**
+   * Check comment of {@link #deepCleanRowBatch} for the usage of this configuration.
+   */
+  private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = ORC_WRITER_PREFIX + "deepCleanBatch";
+
+  private final GenericRecordToOrcValueWriter valueWriter;
+  @VisibleForTesting
+  final VectorizedRowBatch rowBatch;
+  private final Writer orcFileWriter;
+
+  // the close method may be invoked multiple times, but the underlying writer only supports close being called once
+  private volatile boolean closed = false;
+  private final boolean deepCleanBatch;
+
+  private final int batchSize;
+  private final Schema avroSchema;
+
+  public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties)
+      throws IOException {
+    super(builder, properties);
+
+    log.info("Start to construct a ORC-Native Writer");
+
+    // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
+    this.avroSchema = builder.getSchema();
+    TypeDescription typeDescription = AvroOrcSchemaConverter.getOrcSchema(this.avroSchema);
+    this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription, this.avroSchema, properties);
+    this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
+    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
+

Review comment:
       Can you log the ORC writer configuration? Useful for debugging, particularly, when topic-specific configurations are used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r485253677



##########
File path: gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+
+
+public class AvroOrcSchemaConverterTest {
+  @Test
+  public void testUnionORCSchemaTranslation() throws Exception {
+    Schema avroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().intType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription unionSchema = TypeDescription.createUnion()
+        .addUnionChild(TypeDescription.createString())
+        .addUnionChild(TypeDescription.createInt());
+    TypeDescription recordSchemaWithUnion = TypeDescription.createStruct().addField("test_union", unionSchema);
+
+    // Verify the schema conversion for Union works
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroUnion), recordSchemaWithUnion);
+
+    //Create a nullable union field
+    Schema nullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Assert that Orc schema has flattened the nullable union to the member's type
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nullableAvroUnion),
+        TypeDescription.createStruct().addField("test_union", TypeDescription.createString()));
+
+    //Create a non nullable union type
+    Schema nonNullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Ensure that the union type is preserved
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nonNullableAvroUnion), TypeDescription.createStruct()
+        .addField("test_union", TypeDescription.createUnion().addUnionChild(TypeDescription.createString())));
+  }
+
+  @Test
+  public void testTrivialAvroSchemaTranslation() throws Exception {
+
+    // Trivial cases
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("string_type")
+        .type(SchemaBuilder.builder().stringType())
+        .noDefault()
+        .name("int_type")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("string_type", TypeDescription.createString())
+        .addField("int_type", TypeDescription.createInt());
+
+    // Top-level record name will not be replicated in conversion result.
+    Assert.assertEquals(avroSchema.getFields(), getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testUnionAvroSchemaTranslation() throws Exception {
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("union_nested")
+        .type(SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion())
+        .noDefault()
+        .endRecord();
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("union_nested", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createString())
+            .addUnionChild(TypeDescription.createInt()));
+
+    Assert.assertEquals(avroSchema.getFields(), getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testSchemaSanitization() throws Exception {
+
+    // Two field along with null
+    Schema avroSchema = SchemaBuilder.builder().unionOf().nullType().and().stringType().and().intType().endUnion();
+    Schema expectedSchema = SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema), expectedSchema);
+
+    // Only one field except null
+    Schema avroSchema_1 = SchemaBuilder.builder()
+        .unionOf()
+        .nullType()
+        .and()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord()
+        .endUnion();
+    expectedSchema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema_1), expectedSchema);
+  }
+
+  public static Schema getAvroSchema(TypeDescription schema) {

Review comment:
       The same argument as the first one ... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3090: [GOBBLIN-1250] Open Sourcing ORC writer

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r485249845



##########
File path: gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+
+/**
+ * A utility class that provides a method to convert {@link Schema} into {@link TypeDescription}.
+ */
+public class AvroOrcSchemaConverter {

Review comment:
       I actually would prefer keeping them in this module - It looks more natural to leave such kind of data-format specific thing in gobblin-module, while gobblin-utility should be more on handling gobblin constructs. Also the ORC dependency will be pretty messy if moving to utility. A big pain is hive version used in gobblin oss and that being used in orc library. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org