You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2022/06/07 21:48:31 UTC
[gobblin] branch master updated: [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ef18c482b [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)
ef18c482b is described below
commit ef18c482b241378f8912fa1944914167a61f147c
Author: vbohra <vb...@linkedin.com>
AuthorDate: Tue Jun 7 14:48:27 2022 -0700
[GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)
* [GOBBLIN-1655] Update completion watermark for quiet tables during iceberg registration
* [GOBBLIN-1657] Update completion watermark on change_proerty GMCE
* Added test case to check watermark update on change_property
Co-authored-by: Vikram Bohra <vb...@vbohra-mn1.linkedin.biz>
---
.../verifier/KafkaAuditCountVerifier.java | 7 ++
.../iceberg/publisher/GobblinMCEPublisher.java | 3 +
.../iceberg/writer/IcebergMetadataWriter.java | 86 ++++++++++++++--------
.../iceberg/writer/IcebergMetadataWriterTest.java | 41 ++++++++++-
4 files changed, 103 insertions(+), 34 deletions(-)
diff --git a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 04d49a1db..d4495f9eb 100644
--- a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -45,6 +45,8 @@ public class KafkaAuditCountVerifier {
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
+ public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX + "complete.on.no.counts";
+ private final boolean returnCompleteOnNoCounts;
private final AuditCountClient auditCountClient;
private final String srcTier;
@@ -67,6 +69,7 @@ public class KafkaAuditCountVerifier {
state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
this.srcTier = state.getProp(SOURCE_TIER);
this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ this.returnCompleteOnNoCounts = state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
}
/**
@@ -119,6 +122,10 @@ public class KafkaAuditCountVerifier {
Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
+ log.info(String.format("Found empty counts map for %s, returning complete", datasetName));
+ return 1.0;
+ }
double percent = -1;
if (!countsByTier.containsKey(this.srcTier)) {
throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 8b76843af..3e58a53d1 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -98,7 +98,10 @@ public class GobblinMCEPublisher extends DataPublisher {
// There'll be only one dummy file here. This file is parsed for DB and table name calculation.
newFiles = computeDummyFile(state);
if (!newFiles.isEmpty()) {
+ log.info("Dummy file: " + newFiles.keySet().iterator().next());
this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+ } else {
+ log.info("No dummy file created. Not sending GMCE");
}
} else {
this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ebc633e2c..9b0b18450 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -312,6 +312,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
return;
}
}
+ if(tableMetadata.completenessEnabled) {
+ tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
+ String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+ }
+
computeCandidateSchema(gmce, tid, tableSpec);
tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
@@ -322,12 +327,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
- //compute topic name
- if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
- tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
- String topicPartition = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
- tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topicPartition.substring(0, topicPartition.lastIndexOf("-")));
- }
break;
}
case rewrite_files: {
@@ -411,6 +410,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
+ String nativeName = tableMetadata.datasetName;
+ String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
+ tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topic);
}
/**
@@ -692,8 +694,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);
if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType() == OperationType.add_files) {
- tableMetadata.prevCompletenessWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
- String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
// Assumes first partition value to be partitioned by date
// TODO Find better way to determine a partition value
String datepartition = partition.get(0, null);
@@ -722,8 +722,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
PartitionSpec partitionSpec = table.spec();
- long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
- int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, prevCompletenessWatermark);
+ int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, tableMetadata.completionWatermark);
List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
partitionValues.add(String.valueOf(late));
return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
@@ -790,28 +789,33 @@ public class IcebergMetadataWriter implements MetadataWriter {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
- String topicName = props.get(TOPIC_NAME_KEY);
- if(topicName == null) {
- log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s.%s",
- TOPIC_NAME_KEY, dbName, tableName));
- } else {
- long newCompletenessWatermark =
- computeCompletenessWatermark(topicName, tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
- if(newCompletenessWatermark > tableMetadata.prevCompletenessWatermark) {
- log.info(String.format("Updating %s for %s.%s to %s", COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.newCompletenessWatermark = newCompletenessWatermark;
- }
- }
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
+ // Check and update completion watermark when there are no files to be registered, typically for quiet topics
+ // The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
+ log.info(String.format("Checking kafka audit for %s on change_property ", topic));
+ SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
+ ZonedDateTime prevWatermarkDT =
+ Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
+ timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
+ } else {
+ log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
+ tableMetadata.completionWatermark, topic));
+ }
+ }
+
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -835,7 +839,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
-
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
@@ -850,7 +853,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
//Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark, tableMetadata.newCompletenessWatermark);
+ tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
@@ -869,6 +872,30 @@ public class IcebergMetadataWriter implements MetadataWriter {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
+ /**
+ * Update TableMetadata with the new completion watermark upon a successful audit check
+ * @param tableMetadata metadata of table
+ * @param topic topic name
+ * @param timestamps Sorted set in reverse order of timestamps to check audit counts for
+ * @param props table properties map
+ */
+ private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
+ Map<String, String> props) {
+ if (topic == null) {
+ log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s",
+ TOPIC_NAME_KEY, tableMetadata.table.get().name()));
+ }
+ long newCompletenessWatermark =
+ computeCompletenessWatermark(topic, timestamps, tableMetadata.completionWatermark);
+ if (newCompletenessWatermark > tableMetadata.completionWatermark) {
+ log.info(String.format("Updating %s for %s to %s", COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
+ newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
+ tableMetadata.completionWatermark = newCompletenessWatermark;
+ }
+ }
+
/**
* NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit counts match
* for that window (aka its is set to the beginning of next window)
@@ -1085,8 +1112,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
- long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
- long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+ long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
@Setter
@@ -1131,7 +1157,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
- void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWatermark) {
+ void reset(Map<String, String> props, Long lowWaterMark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
this.transaction = Optional.absent();
@@ -1148,8 +1174,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
- this.prevCompletenessWatermark = newCompletionWatermark;
- this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
this.datePartitions.clear();
}
}
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index da564fa3d..5e2e3d00d 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -423,12 +423,12 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testTopic", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
//completeness watermark = "2020-09-16-10"
- Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
@@ -475,7 +475,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1));
@@ -486,6 +486,41 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
}
+ @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
+ public void testChangePropertyGMCECompleteness() throws IOException {
+
+ Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
+ long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
+ File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+ gmce.setOldFilePrefixes(null);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile2.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setOperationType(OperationType.change_property);
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "6000-7000").build());
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(65L))));
+
+ KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
+ Mockito.when(verifier.isComplete("testIcebergTable", watermark, expectedWatermark)).thenReturn(true);
+ ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+ gobblinMCEWriterWithCompletness.flush();
+
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000");
+ Assert.assertEquals(table.spec().fields().get(1).name(), "late");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(expectedWatermark));
+
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);