You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/18 19:05:00 UTC

[gobblin] branch master updated: [GOBBLIN-1668] Add audit counts for iceberg registration (#3527)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new da2241771 [GOBBLIN-1668] Add audit counts for iceberg registration (#3527)
da2241771 is described below

commit da224177174b456c9ec273a907ed344d370217da
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Jul 18 12:04:55 2022 -0700

    [GOBBLIN-1668] Add audit counts for iceberg registration (#3527)
    
    * Add audit counts for iceberg registration
    
    * Address comments
    
    * Address comments
---
 .../apache/gobblin/iceberg/GobblinMCEProducer.java  | 13 ++++++++++---
 .../iceberg/publisher/GobblinMCEPublisher.java      |  5 ++++-
 .../iceberg/writer/IcebergMetadataWriter.java       | 21 +++++++++++++++++++++
 .../iceberg/publisher/GobblinMCEPublisherTest.java  | 12 ++++++------
 .../src/main/avro/GobblinMetadataChangeEvent.avsc   |  7 +++++++
 5 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index ec81f2d9c..b037cb635 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -82,6 +82,10 @@ public abstract class GobblinMCEProducer implements Closeable {
     this.metricContext = Instrumented.getMetricContext(state, this.getClass());
   }
 
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+    sendGMCE(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, null);
+  }
 
   /**
    * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
@@ -90,12 +94,14 @@ public abstract class GobblinMCEProducer implements Closeable {
    * @param oldFiles the list of old file to be dropped
    * @param offsetRange offset range of the new data, can be null
    * @param operationType The opcode of gmce emitted by this method.
+   * @param serializedAuditCountMap Audit count map to be used by {@link org.apache.gobblin.iceberg.writer.IcebergMetadataWriter} to track iceberg
+   *                                registration counts
    * @throws IOException
    */
   public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
-      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource, String serializedAuditCountMap) throws IOException {
     GobblinMetadataChangeEvent gmce =
-        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource);
+        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixes, offsetRange, operationType, schemaSource, serializedAuditCountMap);
     underlyingSendGMCE(gmce);
   }
 
@@ -166,7 +172,7 @@ public abstract class GobblinMCEProducer implements Closeable {
 
   public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path, Metrics> newFiles, List<String> oldFiles,
       List<String> oldFilePrefixes, Map<String, String> offsetRange, OperationType operationType,
-      SchemaSource schemaSource) {
+      SchemaSource schemaSource, String serializedAuditCountMap) {
     if (!verifyInput(newFiles, oldFiles, oldFilePrefixes, operationType)) {
       return null;
     }
@@ -182,6 +188,7 @@ public abstract class GobblinMCEProducer implements Closeable {
       gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
     }
     gmceBuilder.setOperationType(operationType);
+    gmceBuilder.setAuditCountMap(serializedAuditCountMap);
     return gmceBuilder.build();
   }
 
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 3e58a53d1..53611f8c1 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -78,6 +78,8 @@ public class GobblinMCEPublisher extends DataPublisher {
   private static final PathFilter HIDDEN_FILES_FILTER = new HiddenFilter();
   private static final Metrics DUMMY_METRICS = new Metrics(100000000L, null, null, null, null);
 
+  public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = "serializedAuditCountMap";
+
   public GobblinMCEPublisher(State state) throws IOException {
 
     this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
@@ -104,7 +106,8 @@ public class GobblinMCEPublisher extends DataPublisher {
           log.info("No dummy file created. Not sending GMCE");
         }
       } else {
-        this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY,
+            state.getProp(SERIALIZED_AUDIT_COUNT_MAP_KEY));
       }
     }
   }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9b0b18450..643d675de 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -145,6 +145,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
   private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
   private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
+  private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST = "iceberg.registration.audit.count.blacklist";
+  private static final String ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST = "iceberg.registration.audit.count.whitelist";
   private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
   private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
   private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
@@ -174,6 +176,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
   protected final MetricContext metricContext;
   protected EventSubmitter eventSubmitter;
   private final WhitelistBlacklist whitelistBlacklist;
+  private final WhitelistBlacklist auditWhitelistBlacklist;
   private final Closer closer = Closer.create();
 
   // Mapping between table-id and currently processed watermark
@@ -191,8 +194,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private final boolean useDataLocationAsTableLocation;
   private final ParallelRunner parallelRunner;
   private FsPermission permission;
+  protected State state;
 
   public IcebergMetadataWriter(State state) throws IOException {
+    this.state = state;
     this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
     conf = HadoopUtils.getConfFromState(state);
     initializeCatalog();
@@ -208,6 +213,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
         new EventSubmitter.Builder(this.metricContext, MetadataWriterKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
     this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
         state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
+    this.auditWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_WHITELIST, ""),
+        state.getProp(ICEBERG_REGISTRATION_AUDIT_COUNT_BLACKLIST, ""));
 
     // Use rw-lock to make it thread-safe when flush and write(which is essentially aggregate & reading metadata),
     // are called in separate threads.
@@ -324,6 +331,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
       case add_files: {
         updateTableProperty(tableSpec, tid);
         addFiles(gmce, newSpecsMap, table, tableMetadata);
+        if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
+            tableSpec.getTable().getTableName())) {
+          tableMetadata.serializedAuditCountMaps.add(gmce.getAuditCountMap());
+        }
         if (gmce.getTopicPartitionOffsetsRange() != null) {
           mergeOffsets(gmce, tid);
         }
@@ -792,6 +803,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
         String topic = props.get(TOPIC_NAME_KEY);
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
+          sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
           if (tableMetadata.completenessEnabled) {
             checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
           }
@@ -1081,6 +1093,13 @@ public class IcebergMetadataWriter implements MetadataWriter {
     }
   }
 
+  /**
+   * Method to send audit counts given a topic name and a list of serialized audit count maps. Called only when new files
+   * are added. Default is no-op, must be implemented in a subclass.
+   */
+  public void sendAuditCounts(String topicName, Collection<String> serializedAuditCountMaps) {
+  }
+
   @Override
   public void close() throws IOException {
     this.closer.close();
@@ -1114,6 +1133,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     Optional<Long> lowWatermark = Optional.absent();
     long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
     SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
+    List<String> serializedAuditCountMaps = new ArrayList<>();
 
     @Setter
     String datasetName;
@@ -1175,6 +1195,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
       this.lowestGMCEEmittedTime = Long.MAX_VALUE;
       this.lowWatermark = Optional.of(lowWaterMark);
       this.datePartitions.clear();
+      this.serializedAuditCountMaps.clear();
     }
   }
 }
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index 476ec36af..e4f034546 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -172,14 +172,14 @@ public class GobblinMCEPublisherTest {
     GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
     Mockito.doCallRealMethod()
         .when(producer)
-        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
     Mockito.doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
         Object[] args = invocation.getArguments();
         GobblinMetadataChangeEvent gmce =
             producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
-                (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+                (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY, null);
         Assert.assertEquals(gmce.getNewFiles().size(), 1);
         FileSystem fs = FileSystem.get(new Configuration());
         Assert.assertEquals(gmce.getNewFiles().get(0).getFilePath(),
@@ -201,14 +201,14 @@ public class GobblinMCEPublisherTest {
     GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
     Mockito.doCallRealMethod()
         .when(producer)
-        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
     Mockito.doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
         Object[] args = invocation.getArguments();
         GobblinMetadataChangeEvent gmce =
             producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
-                (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+                (Map<String, String>) args[1], OperationType.add_files, SchemaSource.SCHEMAREGISTRY, null);
         Assert.assertEquals(gmce.getNewFiles().size(), 1);
         FileSystem fs = FileSystem.get(new Configuration());
         Charset charset = Charset.forName("UTF-8");
@@ -236,14 +236,14 @@ public class GobblinMCEPublisherTest {
     GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
     Mockito.doCallRealMethod()
         .when(producer)
-        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any());
+        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), anyMap(), any(), any(), any());
     Mockito.doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
         Object[] args = invocation.getArguments();
         GobblinMetadataChangeEvent gmce =
             producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) args[0], null, null,
-                (Map<String, String>) args[1], OperationType.change_property, SchemaSource.NONE);
+                (Map<String, String>) args[1], OperationType.change_property, SchemaSource.NONE, null);
         Assert.assertEquals(gmce.getNewFiles().size(), 1);
         Assert.assertNull(gmce.getOldFiles());
         Assert.assertNull(gmce.getOldFilePrefixes());
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index c5f80ea43..8de3faaf2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -286,6 +286,13 @@
         "default": null,
         "doc": "Array of the metadata writers that are allowed to consume this GMCE. If this field is missing, all metadata writers are allowed.",
         "optional": true
+      },
+      {
+        "name": "auditCountMap",
+        "type": ["null","string"],
+        "default": null,
+        "doc": "Serialized map of audit count",
+        "optional": true
       }
     ]
   }