You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/16 15:32:01 UTC
[gobblin] branch master updated: [GOBBLIN-1443] Make iceberg
metadata root location include db name
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e4d6c1b [GOBBLIN-1443] Make iceberg metadata root location include db name
e4d6c1b is described below
commit e4d6c1b7542f3af5237447021ff14295d2c425ff
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Sun May 16 08:31:54 2021 -0700
[GOBBLIN-1443] Make iceberg metadata root location include db name
Closes #3279 from ZihanLi58/GOBBLIN-1443
---
.../org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java | 8 ++++++--
.../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 4 ++--
.../apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java | 5 ++++-
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index a5d0c72..0871fb7 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -96,7 +96,9 @@ public class GobblinMCEPublisher extends DataPublisher {
if (newFiles.isEmpty()) {
// There'll be only one dummy file here. This file is parsed for DB and table name calculation.
newFiles = computeDummyFile(state);
- this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+ if (!newFiles.isEmpty()) {
+ this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+ }
} else {
this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
}
@@ -150,7 +152,9 @@ public class GobblinMCEPublisher extends DataPublisher {
//
PriorityQueue<FileStatus> fileStatuses =
new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
- fileStatuses.add(fs.getFileStatus(path));
+ if (fs.exists(path)) {
+ fileStatuses.add(fs.getFileStatus(path));
+ }
// Only register files
while (!fileStatuses.isEmpty()) {
FileStatus fileStatus = fileStatuses.poll();
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 02fbb2e..75cbb07 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -120,7 +120,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
public class IcebergMetadataWriter implements MetadataWriter {
public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
- public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata";
+ public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
@@ -409,7 +409,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
Table icebergTable = null;
String tableLocation = null;
if (useDataLoacationAsTableLocation) {
- tableLocation = gmce.getDatasetIdentifier().getNativeName() + TABLE_LOCATION_SUFFIX;
+ tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
//Set the path permission
Path tablePath = new Path(tableLocation);
WriterUtils.mkdirsWithRecursivePermission(tablePath.getFileSystem(conf), tablePath, permission);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index cb1c32c..d6ec191 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -117,7 +117,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:li:dataPlatform:hdfs")
- .setNativeName("/data/tracking/testIcebergTable")
+ .setNativeName(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -138,6 +138,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
TestHiveRegistrationPolicyForIceberg.class.getName());
+ state.setProp("use.data.path.as.table.location", true);
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
@@ -154,6 +155,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 1);
Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
+ Assert.assertEquals(table.location(),
+ new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(