You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/18 19:05:00 UTC
[gobblin] branch master updated: [GOBBLIN-1668] Add audit counts for iceberg registration (#3527)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new da2241771 [GOBBLIN-1668] Add audit counts for iceberg registration (#3527)
da2241771 is described below
commit da224177174b456c9ec273a907ed344d370217da
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Jul 18 12:04:55 2022 -0700
[GOBBLIN-1668] Add audit counts for iceberg registration (#3527)
* Add audit counts for iceberg registration
* Address comments
* Address comments
---
.../apache/gobblin/iceberg/GobblinMCEProducer.java | 13 ++++++++++---
.../iceberg/publisher/GobblinMCEPublisher.java | 5 ++++-
.../iceberg/writer/IcebergMetadataWriter.java | 21 +++++++++++++++++++++
.../iceberg/publisher/GobblinMCEPublisherTest.java | 12 ++++++------
.../src/main/avro/GobblinMetadataChangeEvent.avsc | 7 +++++++
5 files changed, 48 insertions(+), 10 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index ec81f2d9c..b037cb635 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -82,6 +82,10 @@ public abstract class GobblinMCEProducer implements Closeable {
this.metricContext = Instrumented.getMetricContext(state, this.getClass());
}
+ public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
+ Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+ sendGMCE(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, null);
+ }
/**
* This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
@@ -90,12 +94,14 @@ public abstract class GobblinMCEProducer implements Closeable {
* @param oldFiles the list of old file to be dropped
* @param offsetRange offset range of the new data, can be null
* @param operationType The opcode of gmce emitted by this method.
+ * @param serializedAuditCountMap Audit count map to be used by {@link org.apache.gobblin.iceberg.writer.IcebergMetadataWriter} to track iceberg
+ * registration counts
* @throws IOException
*/
public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
- Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+ Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource, String serializedAuditCountMap) throws IOException {
GobblinMetadataChangeEvent gmce =
- getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource);
+ getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, serializedAuditCountMap);
underlyingSendGMCE(gmce);
}
@@ -166,7 +172,7 @@ public abstract class GobblinMCEProducer implements Closeable {
public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path, Metrics> newFiles, List<String> oldFiles,
List<String> oldFilePrefixes, Map<String, String> offsetRange, OperationType operationType,
- SchemaSource schemaSource) {
+ SchemaSource schemaSource, String serializedAuditCountMap) {
if (!verifyInput(newFiles, oldFiles, oldFilePrefixes, operationType)) {
return null;
}
@@ -182,6 +188,7 @@ public abstract class GobblinMCEProducer implements Closeable {
gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
}
gmceBuilder.setOperationType(operationType);
+ gmceBuilder.setAuditCountMap(serializedAuditCountMap);
return gmceBuilder.build();
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 3e58a53d1..53611f8c1 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -78,6 +78,8 @@ public class GobblinMCEPublisher extends DataPublisher {
private static final PathFilter HIDDEN_FILES_FILTER = new HiddenFilter();
private static final Metrics DUMMY_METRICS = new Metrics(100000000L, null, null, null, null);
+ public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = "serializedAuditCountMap";
+
public GobblinMCEPublisher(State state) throws IOException {
this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
@@ -104,7 +106,8 @@ public class GobblinMCEPublisher extends DataPublisher {
log.info("No dummy file created. Not sending GMCE");
}
} else {
- this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+ this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY,
+ state.getProp(SERIALIZED_AUDIT_COUNT_MAP_KEY));
}
}
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9b0b18450..643d675de 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -145,6 +145,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
+ private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST = "iceberg.registration.audit.count.blacklist";
+ private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST = "iceberg.registration.audit.count.whitelist";
private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
@@ -174,6 +176,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
protected final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final WhitelistBlacklist whitelistBlacklist;
+ private final WhitelistBlacklist auditWhitelistBlacklist;
private final Closer closer = Closer.create();
// Mapping between table-id and currently processed watermark
@@ -191,8 +194,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
private final boolean useDataLocationAsTableLocation;
private final ParallelRunner parallelRunner;
private FsPermission permission;
+ protected State state;
public IcebergMetadataWriter(State state) throws IOException {
+ this.state = state;
this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
conf = HadoopUtils.getConfFromState(state);
initializeCatalog();
@@ -208,6 +213,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
new EventSubmitter.Builder(this.metricContext, MetadataWriterKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
+ this.auditWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST, ""),
+ state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST, ""));
// Use rw-lock to make it thread-safe when flush and write(which is essentially aggregate & reading metadata),
// are called in separate threads.
@@ -324,6 +331,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
case add_files: {
updateTableProperty(tableSpec, tid);
addFiles(gmce, newSpecsMap, table, tableMetadata);
+ if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
+ tableSpec.getTable().getTableName())) {
+ tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
+ }
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
@@ -792,6 +803,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
+ sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
}
@@ -1081,6 +1093,13 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
+ /**
+ * Method to send audit counts given a topic name and a list of serialized audit count maps. Called only when new files
+ * are added. Default is no-op, must be implemented in a subclass.
+ */
+ public void sendAuditCounts(String topicName, Collection<String> serializedAuditCountMaps) {
+ }
+
@Override
public void close() throws IOException {
this.closer.close();
@@ -1114,6 +1133,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
Optional<Long> lowWatermark = Optional.absent();
long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
+ List<String> serializedAuditCountMaps = new ArrayList<>();
@Setter
String datasetName;
@@ -1175,6 +1195,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
this.datePartitions.clear();
+ this.serializedAuditCountMaps.clear();
}
}
}
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index 476ec36af..e4f034546 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -172,14 +172,14 @@ public class GobblinMCEPublisherTest {
GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
Mockito.doCallRealMethod()
.when(producer)
- .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+ .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
GobblinMetadataChangeEvent gmce =
producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
- (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+ (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY, null);
Assert.assertEquals(gmce.getNewFiles().size(), 1);
FileSystem fs = FileSystem.get(new Configuration());
Assert.assertEquals(gmce.getNewFiles().get(0).getFilePath(),
@@ -201,14 +201,14 @@ public class GobblinMCEPublisherTest {
GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
Mockito.doCallRealMethod()
.when(producer)
- .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+ .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
GobblinMetadataChangeEvent gmce =
producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
- (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+ (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY, null);
Assert.assertEquals(gmce.getNewFiles().size(), 1);
FileSystem fs = FileSystem.get(new Configuration());
Charset charset = Charset.forName("UTF-8");
@@ -236,14 +236,14 @@ public class GobblinMCEPublisherTest {
GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
Mockito.doCallRealMethod()
.when(producer)
- .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+ .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
GobblinMetadataChangeEvent gmce =
producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
- (Map<String, String>) args[1], OperationType.change_property, SchemaSource.NONE);
+ (Map<String, String>) args[1], OperationType.change_property, SchemaSource.NONE, null);
Assert.assertEquals(gmce.getNewFiles().size(), 1);
Assert.assertNull(gmce.getOldFiles());
Assert.assertNull(gmce.getOldFilePrefixes());
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index c5f80ea43..8de3faaf2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -286,6 +286,13 @@
"default": null,
"doc": "Array of the metadata writers that are allowed to consume this GMCE. If this field is missing, all metadata writers are allowed.",
"optional": true
+ },
+ {
+ "name": "auditCountMap",
+ "type": ["null","string"],
+ "default": null,
+ "doc": "Serialized map of audit count",
+ "optional": true
}
]
}