You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/16 15:32:01 UTC

[gobblin] branch master updated: [GOBBLIN-1443] Make iceberg metadata root location include db name

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d6c1b  [GOBBLIN-1443] Make iceberg metadata root location include db name
e4d6c1b is described below

commit e4d6c1b7542f3af5237447021ff14295d2c425ff
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Sun May 16 08:31:54 2021 -0700

    [GOBBLIN-1443] Make iceberg metadata root location include db name
    
    Closes #3279 from ZihanLi58/GOBBLIN-1443
---
 .../org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java | 8 ++++++--
 .../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java  | 4 ++--
 .../apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java  | 5 ++++-
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index a5d0c72..0871fb7 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -96,7 +96,9 @@ public class GobblinMCEPublisher extends DataPublisher {
       if (newFiles.isEmpty()) {
         // There'll be only one dummy file here. This file is parsed for DB and table name calculation.
         newFiles = computeDummyFile(state);
-        this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+        if (!newFiles.isEmpty()) {
+          this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+        }
       } else {
         this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
       }
@@ -150,7 +152,9 @@ public class GobblinMCEPublisher extends DataPublisher {
       //
       PriorityQueue<FileStatus> fileStatuses =
           new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
-      fileStatuses.add(fs.getFileStatus(path));
+      if (fs.exists(path)) {
+        fileStatuses.add(fs.getFileStatus(path));
+      }
       // Only register files
       while (!fileStatuses.isEmpty()) {
         FileStatus fileStatus = fileStatuses.poll();
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 02fbb2e..75cbb07 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -120,7 +120,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
 public class IcebergMetadataWriter implements MetadataWriter {
 
   public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
-  public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata";
+  public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
   public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
   public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
   private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
@@ -409,7 +409,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     Table icebergTable = null;
     String tableLocation = null;
     if (useDataLoacationAsTableLocation) {
-      tableLocation = gmce.getDatasetIdentifier().getNativeName() + TABLE_LOCATION_SUFFIX;
+      tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
       //Set the path permission
       Path tablePath = new Path(tableLocation);
       WriterUtils.mkdirsWithRecursivePermission(tablePath.getFileSystem(conf), tablePath, permission);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index cb1c32c..d6ec191 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -117,7 +117,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
             .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
-            .setNativeName("/data/tracking/testIcebergTable")
+            .setNativeName(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
             .build())
         .setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
         .setFlowId("testFlow")
@@ -138,6 +138,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
     state.setProp("default.hive.registration.policy",
         TestHiveRegistrationPolicyForIceberg.class.getName());
+    state.setProp("use.data.path.as.table.location", true);
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
     ((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
         HiveMetastoreTest.catalog);
@@ -154,6 +155,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 1);
     Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
+    Assert.assertEquals(table.location(),
+        new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(