You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/01/21 01:59:28 UTC

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3184: Gobblin 1320 add iceberg writer

autumnust commented on a change in pull request #3184:
URL: https://github.com/apache/incubator-gobblin/pull/3184#discussion_r561455188



##########
File path: gobblin-modules/gobblin-iceberg/src/main/java/org.apache.gobblin/writer/IcebergUtil.java
##########
@@ -0,0 +1,47 @@
+package org.apache.gobblin.writer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import java.util.Map;
+
+
+@Slf4j
+public class IcebergUtil {
+
+  public static Table createTable(String path, Map<String, String> properties, boolean partitioned, Schema schema) {
+    PartitionSpec spec;
+    if (partitioned) {
+      spec = PartitionSpec.builderFor(schema).identity("data").build();

Review comment:
       Shall we get more specific identity value here? 

##########
File path: gobblin-modules/gobblin-iceberg/src/main/java/org.apache.gobblin/writer/IcebergTaskWriterFactory.java
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.*;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import java.util.Map;
+
+/**
+ * Factory to create {@link TaskWriter} that is.
+ * responsible for writing generic data
+ * into Iceberg table.
+ *
+ */
+public class IcebergTaskWriterFactory {
+
+    private final Schema schema;
+    private final PartitionSpec partitionSpec;
+    private final LocationProvider locations;
+    private final FileIO io;
+    private final EncryptionManager encryptionManager;
+    private final long targetFileSizeBytes;
+    private final FileFormat format;
+    private final IcebergFileAppenderFactory appenderFactory;
+
+    private transient OutputFileFactory outputFileFactory;
+
+    public IcebergTaskWriterFactory(Schema schema,
+                                    PartitionSpec partitionSpec,
+                                    LocationProvider locations,
+                                    FileIO io,
+                                    EncryptionManager encryptionManager,
+                                    long targetFileSizeBytes,
+                                    FileFormat format,
+                                    Map<String, String> tableProperties) {
+        this.schema = schema;
+        this.partitionSpec = partitionSpec;
+        this.locations = locations;
+        this.io = io;
+        this.encryptionManager = encryptionManager;
+        this.targetFileSizeBytes = targetFileSizeBytes;
+        this.format = format;
+        this.appenderFactory = new IcebergFileAppenderFactory(schema);
+    }
+
+    public void initialize(int taskId, int attemptId) {
+        this.outputFileFactory = new OutputFileFactory(partitionSpec, format, locations, io, encryptionManager, taskId, attemptId);
+    }
+
+    public TaskWriter create() {
+        Preconditions.checkNotNull(outputFileFactory,
+                "The outputFileFactory shouldn't be null if we have invoked the initialize().");
+
+        if (partitionSpec.fields().isEmpty()) {

Review comment:
       So why we need this if-branch ? 

##########
File path: gobblin-modules/gobblin-iceberg/src/main/java/org.apache.gobblin/writer/IcebergFileAppenderFactory.java
##########
@@ -0,0 +1,84 @@
+package org.apache.gobblin.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+
+
+/**
+ * Factory to create a new {@link FileAppender} that
+ * write generic data. The created {@link FileAppender}
+ * will be wrapped in {@link TaskWriter}
+ */
+public class IcebergFileAppenderFactory implements FileAppenderFactory {

Review comment:
       shall we rename it to `GobblinFileAppender` if following those implementation in Iceberg codebase? 

##########
File path: gobblin-modules/gobblin-iceberg/src/test/java/org/apache/gobblin/writer/IcebergTestUtil.java
##########
@@ -0,0 +1,130 @@
+package org.apache.gobblin.writer;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.*;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.testng.Assert;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+
+public class IcebergTestUtil {
+
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "data", Types.StringType.get())
+  );
+
+  public static final Record RECORD = GenericRecord.create(SCHEMA);
+
+  public static Table createTable(String path, Map<String, String> properties, boolean partitioned) {
+    PartitionSpec spec;
+    if (partitioned) {
+      spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    } else {
+      spec = PartitionSpec.unpartitioned();
+    }
+    return new HadoopTables().create(SCHEMA, spec, properties, path);
+  }
+
+  public static Record createRecord(Integer id, String data) {
+    Record record = RECORD.copy();
+    record.setField("id", id);
+    record.setField("data", data);
+    return record;
+  }
+
+  public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+    table.refresh();
+    try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+      Assert.assertEquals(
+          Sets.newHashSet(expected), Sets.newHashSet(iterable));
+    }
+  }
+
+  public static void assertTableRecords(String tablePath, List<Record> expected) throws IOException {
+    Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
+    assertTableRecords(new HadoopTables().load(tablePath), expected);
+  }
+
+  public static org.apache.avro.Schema generateAvroSchema() {
+    org.apache.avro.Schema avroSchema = (new org.apache.avro.Schema.Parser()).parse("{\n" +
+        "  \"namespace\": \"com.linkedin.orc\",\n" +
+        "  \"type\": \"record\",\n" +
+        "  \"name\": \"IcebergTest\",\n" +
+        "  \"fields\": [\n" +
+        "    {\n" +
+        "      \"name\": \"id\",\n" +
+        "      \"type\": \"int\"\n" +
+        "    },\n" +
+        "    {\n" +
+        "      \"name\": \"data\",\n" +
+        "      \"type\": \"string\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}\n");
+    return avroSchema;
+  }
+
+  public static List<org.apache.avro.generic.GenericRecord> genericAvroRecords() throws IOException {
+    List<org.apache.avro.generic.GenericRecord> list = new ArrayList<>();
+    org.apache.avro.Schema avroSchema = generateAvroSchema();
+
+    GenericRecordBuilder builder_0 = new GenericRecordBuilder(avroSchema);
+    builder_0.set("id", 1);
+    builder_0.set("data", "alice");
+    list.add(builder_0.build());
+
+    GenericRecordBuilder builder_1 = new GenericRecordBuilder(avroSchema);
+    builder_1.set("id", 2);
+    builder_1.set("data", "bob");
+    list.add(builder_1.build());
+    return list;
+  }
+
+
+
+  public static List<org.apache.avro.generic.GenericRecord> deserializeAvroRecords(org.apache.avro.Schema schema, String schemaPath)

Review comment:
       Where is this method being used? In fact I thought we will need to directly deal with avro file for deserialization ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org