You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/07/19 22:39:10 UTC

[gobblin] branch master updated: [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster (#3331)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 08db23e  [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster (#3331)
08db23e is described below

commit 08db23e15ab73654142998ecabb01bbb51f42a61
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Jul 19 15:38:59 2021 -0700

    [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster (#3331)
    
    * [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster
    
    * add unit test
    
    * address comments
---
 .../java/org/apache/gobblin/iceberg/GobblinMCEProducer.java    |  3 ++-
 .../org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java    |  8 ++++++--
 .../gobblin/iceberg/writer/IcebergMetadataWriterTest.java      | 10 ++++++++++
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 130ee04..4a7f566 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -66,6 +66,7 @@ import static org.apache.gobblin.iceberg.writer.GobblinMCEWriter.HIVE_PARTITION_
 public abstract class GobblinMCEProducer implements Closeable {
 
   public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String GMCE_CLUSTER_NAME = "GobblinMCE.cluster.name";
   public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
   private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
   private static final String DATASET_ORIGIN_KEY = "dataset.origin";
@@ -112,7 +113,7 @@ public abstract class GobblinMCEProducer implements Closeable {
         .setDataOrigin(DataOrigin.valueOf(origin))
         .setNativeName(state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR))
         .build());
-    gmceBuilder.setCluster(ClustersNames.getInstance().getClusterName());
+    gmceBuilder.setCluster(state.getProp(GMCE_CLUSTER_NAME, ClustersNames.getInstance().getClusterName()));
     //retention job does not have job.id
     gmceBuilder.setFlowId(
         state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 2d8a340..ebbf74b 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -81,6 +82,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
 public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
   public static final String FORCE_HIVE_DATABASE_NAME = "force.hive.database.name";
+  public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
   public static final String METADATA_REGISTRATION_THREADS = "metadata.registration.threads";
   public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = "metadata.parallel.runner.timeout.mills";
   public static final String HIVE_PARTITION_NAME = "hive.partition.name";
@@ -91,6 +93,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   List<MetadataWriter> metadataWriters;
   Map<String, OperationType> tableOperationTypeMap;
   Map<String, OperationType> datasetOperationTypeMap;
+  Set<String> acceptedClusters;
   protected State state;
   private final ParallelRunner parallelRunner;
   private int parallelRunnerTimeoutMills;
@@ -103,6 +106,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     newSpecsMaps = new HashMap<>();
     oldSpecsMaps = new HashMap<>();
     metadataWriters = new ArrayList<>();
+    acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
     state = properties;
     for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
       metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state)));
@@ -183,8 +187,8 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   @Override
   public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws IOException {
     GenericRecord genericRecord = recordEnvelope.getRecord();
-    //filter out the events that not for this cluster
-    if (!genericRecord.get("cluster").equals(ClustersNames.getInstance().getClusterName())) {
+    //filter out the events that not emitted by accepted clusters
+    if (!acceptedClusters.contains(genericRecord.get("cluster"))) {
       return;
     }
     // Use schema from record to avoid issue when schema evolution
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 79cfa5d..066bcc28 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -85,6 +85,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   private String dbName = "hivedb";
 
   private GobblinMCEWriter gobblinMCEWriter;
+  private GobblinMCEWriter gobblinMCEWriterWithAcceptClusters;
 
   GobblinMetadataChangeEvent gmce;
   static File tmpDir;
@@ -96,6 +97,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   @AfterClass
   public void clean() throws Exception {
     gobblinMCEWriter.close();
+    gobblinMCEWriterWithAcceptClusters.close();
     FileUtils.forceDeleteOnExit(tmpDir);
   }
   @BeforeClass
@@ -142,6 +144,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         TestHiveRegistrationPolicyForIceberg.class.getName());
     state.setProp("use.data.path.as.table.location", true);
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+    state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
+    gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
     ((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
         HiveMetastoreTest.catalog);
     _avroPartitionSchema =
@@ -150,6 +154,12 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
   @Test ( priority = 0 )
   public void testWriteAddFileGMCE() throws IOException {
+    gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(gmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(10L))));
+    //Test when accept clusters does not contain the gmce cluster, we will skip
+    Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 0);
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),