You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/11 20:54:02 UTC

[gobblin] branch master updated: [GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b32746af0 [GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)
b32746af0 is described below

commit b32746af0d6e0700428d074b84c95702ff57c3f5
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Mon Apr 11 13:53:58 2022 -0700

    [GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)
    
    * [GOBBLIN-1613] Add metadata writers field to GMCE schema
    
    * generalize dataset, platform, and table naming
    * more test coverage for GMCE writer
    
    * Reverting data.json syntax change.
    - Avro doesn't follow regular json syntax
    
    * Clean up random semi colon
    
    * Improve naming
---
 gobblin-iceberg/build.gradle                       |   2 +
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   |  51 ++++-
 .../iceberg/publisher/GobblinMCEPublisherTest.java |   2 +-
 .../iceberg/writer/GobblinMCEWriterTest.java       | 251 +++++++++++++++++++++
 .../iceberg/writer/HiveMetadataWriterTest.java     |  12 +-
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  34 +--
 .../src/main/avro/GobblinMetadataChangeEvent.avsc  |  10 +
 7 files changed, 337 insertions(+), 25 deletions(-)

diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fe030f80c..8405a3c92 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -56,6 +56,8 @@ dependencies {
     testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
     testCompile externalDependency.testng
     testCompile externalDependency.mockito
+    // Added to mock static methods for GobblinMCEWriterTest
+    testCompile externalDependency.powermock
 }
 
 configurations {
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 5f8683651..f9bb21c97 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -35,6 +37,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.commons.collections.CollectionUtils;
+import com.google.common.annotations.VisibleForTesting;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -288,18 +292,37 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
             ((LongWatermark)watermark.getWatermark()).getValue()-1, ((LongWatermark)watermark.getWatermark()).getValue()));
       }
       tableOperationTypeMap.get(tableString).gmceHighWatermark = ((LongWatermark)watermark.getWatermark()).getValue();
-      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+
+      List<MetadataWriter> allowedWriters = getAllowedMetadataWriters(gmce, metadataWriters);
+      writeWithMetadataWriters(recordEnvelope, allowedWriters, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
-  // Add fault tolerant ability and make sure we can emit GTE as desired
-  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+  /**
+   * Entry point for calling the allowed metadata writers specified in the GMCE
+   * Adds fault tolerant ability and make sure we can emit GTE as desired
+   * Visible for testing because the WriteEnvelope method has complicated hive logic
+   * @param recordEnvelope
+   * @param allowedWriters metadata writers that will be written to
+   * @param newSpecsMap
+   * @param oldSpecsMap
+   * @param spec
+   * @throws IOException when max number of dataset errors is exceeded
+   */
+  @VisibleForTesting
+  void writeWithMetadataWriters(
+      RecordEnvelope<GenericRecord> recordEnvelope,
+      List<MetadataWriter> allowedWriters,
+      ConcurrentHashMap newSpecsMap,
+      ConcurrentHashMap oldSpecsMap,
+      HiveSpec spec
+  ) throws IOException {
     boolean meetException = false;
     String dbName = spec.getTable().getDbName();
     String tableName = spec.getTable().getTableName();
     String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-    for (MetadataWriter writer : metadataWriters) {
+    for (MetadataWriter writer : allowedWriters) {
       if (meetException) {
         writer.reset(dbName, tableName);
       } else {
@@ -314,6 +337,24 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     }
   }
 
+  /**
+   *   All metadata writers will be returned if no metadata writers are specified in gmce
+   * @param gmce
+   * @param metadataWriters
+   * @return The metadata writers allowed as specified by GMCE. Relative order of {@code metadataWriters} is maintained
+   */
+  @VisibleForTesting
+  static List<MetadataWriter> getAllowedMetadataWriters(GobblinMetadataChangeEvent gmce, List<MetadataWriter> metadataWriters) {
+    if (CollectionUtils.isEmpty(gmce.getAllowedMetadataWriters())) {
+      return metadataWriters;
+    }
+
+    Set<String> allowSet = new HashSet<>(gmce.getAllowedMetadataWriters());
+    return metadataWriters.stream()
+        .filter(writer -> allowSet.contains(writer.getClass().getName()))
+        .collect(Collectors.toList());
+  }
+
   private void addOrThrowException(Exception e, String tableString, String dbName, String tableName) throws IOException{
     TableStatus tableStatus = tableOperationTypeMap.get(tableString);
     Map<String, GobblinMetadataException> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
@@ -379,7 +420,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
    */
   @Override
   public void flush() throws IOException {
-    log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
+    log.info(String.format("begin flushing %s records", String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index c5a4e382c..476ec36af 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -101,7 +101,7 @@ public class GobblinMCEPublisherTest {
     InputStream dataInputStream = clazz.getClassLoader().getResourceAsStream(schemaPath);
     Decoder decoder = DecoderFactory.get().jsonDecoder(schema, dataInputStream);
     GenericRecord recordContainer = reader.read(null, decoder);
-    ;
+
     try {
       while (recordContainer != null) {
         records.add(recordContainer);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
new file mode 100644
index 000000000..32e8a34e1
--- /dev/null
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import lombok.SneakyThrows;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Test class uses PowerMockito and Testng
+ * References:
+ * https://github.com/powermock/powermock/issues/434
+ * https://jivimberg.io/blog/2016/04/03/using-powermock-plus-testng-to-mock-a-static-class/
+ * https://www.igorkromin.net/index.php/2018/10/04/how-to-fix-powermock-exception-linkageerror-loader-constraint-violation/
+ */
+@PrepareForTest({GobblinConstructorUtils.class, FileSystem.class})
+@PowerMockIgnore("javax.management.*")
+public class GobblinMCEWriterTest extends PowerMockTestCase {
+
+  private String dbName = "hivedb";
+  private String tableName = "testTable";
+  private GobblinMCEWriter gobblinMCEWriter;
+  private KafkaStreamingExtractor.KafkaWatermark watermark;
+
+  private GobblinMetadataChangeEvent.Builder gmceBuilder;
+
+  // Not using field injection because they must be different classes
+  private MetadataWriter mockWriter;
+  private MetadataWriter exceptionWriter;
+
+  @Mock
+  private FileSystem fs;
+
+  @Mock
+  private HiveSpec mockHiveSpec;
+
+  @Mock
+  private HiveTable mockTable;
+
+  @AfterMethod
+  public void clean() throws Exception {
+    gobblinMCEWriter.close();
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    initMocks();
+    gmceBuilder = GobblinMetadataChangeEvent.newBuilder()
+        .setDatasetIdentifier(DatasetIdentifier.newBuilder()
+            .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+            .setNativeName("testDB/testTable")
+            .build())
+        .setFlowId("testFlow")
+        .setSchemaSource(SchemaSource.EVENT)
+        .setOperationType(OperationType.add_files)
+        .setCluster(ClustersNames.getInstance().getClusterName());
+    watermark = new KafkaStreamingExtractor.KafkaWatermark(
+        new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+        new LongWatermark(10L));
+
+    State state = new State();
+    String metadataWriters = String.join(",",
+        Arrays.asList(mockWriter.getClass().getName(), exceptionWriter.getClass().getName()));
+    state.setProp("gmce.metadata.writer.classes", metadataWriters);
+
+    Mockito.doNothing().when(mockWriter)
+        .writeEnvelope(
+            Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+    Mockito.doThrow(new IOException("Test Exception")).when(exceptionWriter)
+        .writeEnvelope(
+            Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+
+    PowerMockito.mockStatic(GobblinConstructorUtils.class);
+    when(GobblinConstructorUtils.invokeConstructor(
+            eq(MetadataWriter.class), eq(mockWriter.getClass().getName()), any(State.class)))
+        .thenReturn(mockWriter);
+    when(GobblinConstructorUtils.invokeConstructor(
+        eq(MetadataWriter.class), eq(exceptionWriter.getClass().getName()), any(State.class)))
+        .thenReturn(exceptionWriter);
+
+    PowerMockito.mockStatic(FileSystem.class);
+    when(FileSystem.get(any()))
+        .thenReturn(fs);
+
+    when(mockTable.getDbName()).thenReturn(dbName);
+    when(mockTable.getTableName()).thenReturn(tableName);
+    when(mockHiveSpec.getTable()).thenReturn(mockTable);
+
+    gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+  }
+
+  @Test
+  public void testWriteWhenWriterSpecified() throws IOException {
+    gmceBuilder.setAllowedMetadataWriters(Arrays.asList(mockWriter.getClass().getName()));
+    writeWithMetadataWriters(gmceBuilder.build());
+
+    Mockito.verify(mockWriter, Mockito.times(1)).writeEnvelope(
+        Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+    Mockito.verify(exceptionWriter, never()).writeEnvelope(
+        Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+  }
+
+  @Test
+  public void testFaultTolerance() throws IOException {
+
+    gobblinMCEWriter.setMaxErrorDataset(1);
+    gobblinMCEWriter.metadataWriters = Arrays.asList(mockWriter, exceptionWriter, mockWriter);
+    gobblinMCEWriter.tableOperationTypeMap = new HashMap<>();
+
+    String dbName2 = dbName + "2";
+    String otherDb = "someOtherDB";
+    addTableStatus(dbName, "datasetPath");
+    addTableStatus(dbName2, "datasetPath");
+    addTableStatus(otherDb, "otherDatasetPath");
+
+    BiConsumer<String, String> verifyMocksCalled = new BiConsumer<String, String>(){
+      private int timesCalled = 0;
+      @Override
+      @SneakyThrows
+      public void accept(String dbName, String tableName) {
+        timesCalled++;
+
+        // also validates that order is maintained since all writers after an exception should reset instead of write
+        Mockito.verify(mockWriter, Mockito.times(timesCalled)).writeEnvelope(
+            Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+        Mockito.verify(exceptionWriter, Mockito.times(timesCalled)).writeEnvelope(
+            Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+        Mockito.verify(exceptionWriter, Mockito.times(1)).reset(dbName, tableName);
+        Mockito.verify(mockWriter, Mockito.times(1)).reset(dbName, tableName);
+      }
+    };
+
+    writeWithMetadataWriters(gmceBuilder.build());
+    verifyMocksCalled.accept(dbName, tableName);
+
+    // Another exception for same dataset but different db
+    when(mockTable.getDbName()).thenReturn(dbName2);
+    writeWithMetadataWriters(gmceBuilder.build());
+    verifyMocksCalled.accept(dbName2, tableName);
+
+    // exception thrown because exceeds max number of datasets with errors
+    when(mockTable.getDbName()).thenReturn(otherDb);
+    Assert.expectThrows(IOException.class, () -> writeWithMetadataWriters(gmceBuilder.setDatasetIdentifier(DatasetIdentifier.newBuilder()
+        .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+        .setNativeName("someOtherDB/testTable")
+        .build()).build()));
+  }
+
+  @Test(dataProvider = "AllowMockMetadataWriter")
+  public void testGetAllowedMetadataWriters(List<String> metadataWriters) {
+    Assert.assertNotEquals(mockWriter.getClass().getName(), exceptionWriter.getClass().getName());
+    gmceBuilder.setAllowedMetadataWriters(metadataWriters);
+    List<MetadataWriter> allowedWriters = GobblinMCEWriter.getAllowedMetadataWriters(
+        gmceBuilder.build(),
+        Arrays.asList(mockWriter, exceptionWriter));
+
+    Assert.assertEquals(allowedWriters.size(), 2);
+    Assert.assertEquals(allowedWriters.get(0).getClass().getName(), mockWriter.getClass().getName());
+    Assert.assertEquals(allowedWriters.get(1).getClass().getName(), exceptionWriter.getClass().getName());
+  }
+
+  @DataProvider(name="AllowMockMetadataWriter")
+  public Object[][] allowMockMetadataWriterParams() {
+    initMocks();
+    return new Object[][] {
+        {Arrays.asList(mockWriter.getClass().getName(), exceptionWriter.getClass().getName())},
+        {Collections.emptyList()}
+    };
+  }
+
+  private void initMocks() {
+    MockitoAnnotations.initMocks(this);
+    // Hacky way to have 2 mock MetadataWriter "classes" with different underlying names
+    mockWriter = Mockito.mock(MetadataWriter.class);
+    exceptionWriter = Mockito.mock(TestExceptionMetadataWriter.class);
+  }
+
+  private static abstract class TestExceptionMetadataWriter implements MetadataWriter { }
+
+  private void writeWithMetadataWriters(GobblinMetadataChangeEvent gmce) throws IOException {
+    List<MetadataWriter> allowedMetadataWriters = GobblinMCEWriter.getAllowedMetadataWriters(
+        gmce, gobblinMCEWriter.getMetadataWriters());
+
+    gobblinMCEWriter.writeWithMetadataWriters(new RecordEnvelope<>(gmce, watermark), allowedMetadataWriters,
+        new ConcurrentHashMap(), new ConcurrentHashMap(), mockHiveSpec);
+  }
+
+  private void addTableStatus(String dbName, String datasetPath) {
+    gobblinMCEWriter.tableOperationTypeMap.put(dbName + "." + tableName, new GobblinMCEWriter.TableStatus(
+        OperationType.add_files, datasetPath, "GobblinMetadataChangeEvent_test-1", 0, 50));
+  }
+
+  @ObjectFactory
+  public IObjectFactory getObjectFactory() {
+    return new PowerMockObjectFactory();
+  }
+}
+
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index d6906d3c4..7e763c60f 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -61,6 +61,7 @@ import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.hive.writer.HiveMetadataWriter;
 import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.DataMetrics;
 import org.apache.gobblin.metadata.DataOrigin;
@@ -136,11 +137,11 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
       client.createDatabase(
           new Database(dedupedDbName, "dedupeddatabase", tmpDir.getAbsolutePath() + "/metastore_deduped", Collections.emptyMap()));
     }
-    hourlyDataFile_1 = new File(tmpDir, "data/tracking/testTable/hourly/2020/03/17/08/data.avro");
+    hourlyDataFile_1 = new File(tmpDir, "testDB/testTable/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
-    hourlyDataFile_2 = new File(tmpDir, "data/tracking/testTable/hourly/2020/03/17/09/data.avro");
+    hourlyDataFile_2 = new File(tmpDir, "testDB/testTable/hourly/2020/03/17/09/data.avro");
     Files.createParentDirs(hourlyDataFile_2);
-    dailyDataFile = new File(tmpDir, "data/tracking/testTable/daily/2020/03/17/data.avro");
+    dailyDataFile = new File(tmpDir, "testDB/testTable/daily/2020/03/17/data.avro");
     Files.createParentDirs(dailyDataFile);
     dataDir = new File(hourlyDataFile_1.getParent());
     Assert.assertTrue(dataDir.exists());
@@ -152,8 +153,8 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
     gmce = GobblinMetadataChangeEvent.newBuilder()
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
-            .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
-            .setNativeName("/data/tracking/testTable")
+            .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+            .setNativeName("/testDB/testTable")
             .build())
         .setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
         .setFlowId("testFlow")
@@ -169,6 +170,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
         .setPartitionColumns(Lists.newArrayList("testpartition"))
         .setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
         .setRegistrationProperties(registrationState)
+        .setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName()))
         .build();
     state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
         KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index c6e5ce579..288d689ea 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.iceberg.writer;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificData;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -123,11 +125,11 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     startMetastore();
 
     tmpDir = Files.createTempDir();
-    hourlyDataFile_1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
+    hourlyDataFile_1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
-    hourlyDataFile_2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/09/data.avro");
+    hourlyDataFile_2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/09/data.avro");
     Files.createParentDirs(hourlyDataFile_2);
-    dailyDataFile = new File(tmpDir, "data/tracking/testIcebergTable/daily/2020/03/17/data.avro");
+    dailyDataFile = new File(tmpDir, "testDB/testIcebergTable/daily/2020/03/17/data.avro");
     Files.createParentDirs(dailyDataFile);
     dataDir = new File(hourlyDataFile_1.getParent());
     Assert.assertTrue(dataDir.exists());
@@ -137,8 +139,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     gmce = GobblinMetadataChangeEvent.newBuilder()
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
-            .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
-            .setNativeName(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+            .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+            .setNativeName(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
             .build())
         .setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
         .setFlowId("testFlow")
@@ -154,6 +156,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         .setPartitionColumns(Lists.newArrayList("testpartition"))
         .setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
         .setRegistrationProperties(ImmutableMap.<String, String>builder().put("hive.database.name", dbName).build())
+        .setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName()))
         .build();
 
     State state = getState();
@@ -219,7 +222,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
     Assert.assertEquals(table.location(),
-        new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+        new File(tmpDir, "testDB/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
 
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
     GenericRecord genericGmce_1000_2000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -346,7 +349,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Mockito.doThrow(new IOException("Test failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
     gobblinMCEWriter.metadataWriters.add(0, mockWriter);
 
-    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    GobblinMetadataChangeEvent gmceWithMockWriter = SpecificData.get().deepCopy(gmce.getSchema(), gmce);
+    gmceWithMockWriter.setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName(), mockWriter.getClass().getName()));
+
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmceWithMockWriter.getSchema(), gmceWithMockWriter);
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
@@ -358,10 +364,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
-        .get(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+        .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
         .get("hivedb.testIcebergTable").lowWatermark, 50L);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
-        .get(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+        .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
         .get("hivedb.testIcebergTable").highWatermark, 52L);
 
     // No events sent yet since the topic has not been flushed
@@ -378,8 +384,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK), "52");
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 0);
 
-    gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
-    GenericRecord genericGmce_differentDb = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gmceWithMockWriter.getDatasetIdentifier().setNativeName("testDB/testFaultTolerant");
+    GenericRecord genericGmce_differentDb = GenericData.get().deepCopy(gmceWithMockWriter.getSchema(), gmceWithMockWriter);
     Assert.expectThrows(IOException.class, () -> gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
@@ -393,7 +399,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
     // without risking running into type cast runtime error.
     gmce.setOperationType(OperationType.add_files);
-    File hourlyFile = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/10/data.avro");
+    File hourlyFile = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/10/data.avro");
     long timestampMillis = 1631811600000L;
     Files.createParentDirs(hourlyFile);
     writeRecord(hourlyFile);
@@ -429,7 +435,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertTrue(dfl.hasNext());
 
     // Test when completeness watermark is still "2021-09-16-10" but have a late file for "2021-09-16-09"
-    File hourlyFile1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/09/data1.avro");
+    File hourlyFile1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/09/data1.avro");
     Files.createParentDirs(hourlyFile1);
     writeRecord(hourlyFile1);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -452,7 +458,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
 
     // Test when completeness watermark will advance to "2021-09-16-11"
-    File hourlyFile2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/11/data.avro");
+    File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
     long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
     Files.createParentDirs(hourlyFile2);
     writeRecord(hourlyFile2);
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index aa3dd8b45..c5f80ea43 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -276,6 +276,16 @@
         "doc": "The avro schema with iceberg schema id for the data, this is used to match the field id in the file metrics to the field name",
         "default": null,
         "optional": true
+      },
+      {
+        "name": "allowedMetadataWriters",
+        "type": ["null",{
+          "type": "array",
+          "items" : "string"
+        } ],
+        "default": null,
+        "doc": "Array of the metadata writers that are allowed to consume this GMCE. If this field is missing, all metadata writers are allowed.",
+        "optional": true
       }
     ]
   }