You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/07/19 22:39:10 UTC
[gobblin] branch master updated: [GOBBLIN-1490] Make metadata
pipeline to support consume GMCE emitted from different cluster (#3331)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 08db23e [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster (#3331)
08db23e is described below
commit 08db23e15ab73654142998ecabb01bbb51f42a61
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Jul 19 15:38:59 2021 -0700
[GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster (#3331)
* [GOBBLIN-1490] Make metadata pipeline to support consume GMCE emitted from different cluster
* add unit test
* address comments
---
.../java/org/apache/gobblin/iceberg/GobblinMCEProducer.java | 3 ++-
.../org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java | 8 ++++++--
.../gobblin/iceberg/writer/IcebergMetadataWriterTest.java | 10 ++++++++++
3 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 130ee04..4a7f566 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -66,6 +66,7 @@ import static org.apache.gobblin.iceberg.writer.GobblinMCEWriter.HIVE_PARTITION_
public abstract class GobblinMCEProducer implements Closeable {
public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+ public static final String GMCE_CLUSTER_NAME = "GobblinMCE.cluster.name";
public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
private static final String DATASET_ORIGIN_KEY = "dataset.origin";
@@ -112,7 +113,7 @@ public abstract class GobblinMCEProducer implements Closeable {
.setDataOrigin(DataOrigin.valueOf(origin))
.setNativeName(state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR))
.build());
- gmceBuilder.setCluster(ClustersNames.getInstance().getClusterName());
+ gmceBuilder.setCluster(state.getProp(GMCE_CLUSTER_NAME, ClustersNames.getInstance().getClusterName()));
//retention job does not have job.id
gmceBuilder.setFlowId(
state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 2d8a340..ebbf74b 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -81,6 +82,7 @@ import org.apache.gobblin.writer.DataWriterBuilder;
public class GobblinMCEWriter implements DataWriter<GenericRecord> {
public static final String DEFAULT_HIVE_REGISTRATION_POLICY_KEY = "default.hive.registration.policy";
public static final String FORCE_HIVE_DATABASE_NAME = "force.hive.database.name";
+ public static final String ACCEPTED_CLUSTER_NAMES = "accepted.cluster.names";
public static final String METADATA_REGISTRATION_THREADS = "metadata.registration.threads";
public static final String METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS = "metadata.parallel.runner.timeout.mills";
public static final String HIVE_PARTITION_NAME = "hive.partition.name";
@@ -91,6 +93,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
List<MetadataWriter> metadataWriters;
Map<String, OperationType> tableOperationTypeMap;
Map<String, OperationType> datasetOperationTypeMap;
+ Set<String> acceptedClusters;
protected State state;
private final ParallelRunner parallelRunner;
private int parallelRunnerTimeoutMills;
@@ -103,6 +106,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
newSpecsMaps = new HashMap<>();
oldSpecsMaps = new HashMap<>();
metadataWriters = new ArrayList<>();
+ acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
state = properties;
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state)));
@@ -183,8 +187,8 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
@Override
public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws IOException {
GenericRecord genericRecord = recordEnvelope.getRecord();
- //filter out the events that not for this cluster
- if (!genericRecord.get("cluster").equals(ClustersNames.getInstance().getClusterName())) {
+ //filter out the events that not emitted by accepted clusters
+ if (!acceptedClusters.contains(genericRecord.get("cluster"))) {
return;
}
// Use schema from record to avoid issue when schema evolution
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 79cfa5d..066bcc28 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -85,6 +85,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
private String dbName = "hivedb";
private GobblinMCEWriter gobblinMCEWriter;
+ private GobblinMCEWriter gobblinMCEWriterWithAcceptClusters;
GobblinMetadataChangeEvent gmce;
static File tmpDir;
@@ -96,6 +97,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
@AfterClass
public void clean() throws Exception {
gobblinMCEWriter.close();
+ gobblinMCEWriterWithAcceptClusters.close();
FileUtils.forceDeleteOnExit(tmpDir);
}
@BeforeClass
@@ -142,6 +144,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
TestHiveRegistrationPolicyForIceberg.class.getName());
state.setProp("use.data.path.as.table.location", true);
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+ state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
+ gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
_avroPartitionSchema =
@@ -150,6 +154,12 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
@Test ( priority = 0 )
public void testWriteAddFileGMCE() throws IOException {
+ gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(gmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(10L))));
+ //Test when accept clusters does not contain the gmce cluster, we will skip
+ Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 0);
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),