You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/11 20:54:02 UTC
[gobblin] branch master updated: [GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b32746af0 [GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)
b32746af0 is described below
commit b32746af0d6e0700428d074b84c95702ff57c3f5
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Mon Apr 11 13:53:58 2022 -0700
[GOBBLIN-1613] Add metadata writers field to GMCE schema (#3490)
* [GOBBLIN-1613] Add metadata writers field to GMCE schema
* generalize dataset, platform, and table naming
* more test coverage for GMCE writer
* Reverting data.json syntax change.
- Avro doesn't follow regular json syntax
* Clean up random semi colon
* Improve naming
---
gobblin-iceberg/build.gradle | 2 +
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 51 ++++-
.../iceberg/publisher/GobblinMCEPublisherTest.java | 2 +-
.../iceberg/writer/GobblinMCEWriterTest.java | 251 +++++++++++++++++++++
.../iceberg/writer/HiveMetadataWriterTest.java | 12 +-
.../iceberg/writer/IcebergMetadataWriterTest.java | 34 +--
.../src/main/avro/GobblinMetadataChangeEvent.avsc | 10 +
7 files changed, 337 insertions(+), 25 deletions(-)
diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fe030f80c..8405a3c92 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -56,6 +56,8 @@ dependencies {
testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
testCompile externalDependency.testng
testCompile externalDependency.mockito
+ // Added to mock static methods for GobblinMCEWriterTest
+ testCompile externalDependency.powermock
}
configurations {
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 5f8683651..f9bb21c97 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -35,6 +37,8 @@ import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.commons.collections.CollectionUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -288,18 +292,37 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
((LongWatermark)watermark.getWatermark()).getValue()-1, ((LongWatermark)watermark.getWatermark()).getValue()));
}
tableOperationTypeMap.get(tableString).gmceHighWatermark = ((LongWatermark)watermark.getWatermark()).getValue();
- write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+
+ List<MetadataWriter> allowedWriters = getAllowedMetadataWriters(gmce, metadataWriters);
+ writeWithMetadataWriters(recordEnvelope, allowedWriters, newSpecsMap, oldSpecsMap, spec);
}
this.recordCount.incrementAndGet();
}
- // Add fault tolerant ability and make sure we can emit GTE as desired
- private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+ /**
+ * Entry point for calling the allowed metadata writers specified in the GMCE
+ * Adds fault tolerant ability and make sure we can emit GTE as desired
+ * Visible for testing because the WriteEnvelope method has complicated hive logic
+ * @param recordEnvelope
+ * @param allowedWriters metadata writers that will be written to
+ * @param newSpecsMap
+ * @param oldSpecsMap
+ * @param spec
+ * @throws IOException when max number of dataset errors is exceeded
+ */
+ @VisibleForTesting
+ void writeWithMetadataWriters(
+ RecordEnvelope<GenericRecord> recordEnvelope,
+ List<MetadataWriter> allowedWriters,
+ ConcurrentHashMap newSpecsMap,
+ ConcurrentHashMap oldSpecsMap,
+ HiveSpec spec
+ ) throws IOException {
boolean meetException = false;
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
- for (MetadataWriter writer : metadataWriters) {
+ for (MetadataWriter writer : allowedWriters) {
if (meetException) {
writer.reset(dbName, tableName);
} else {
@@ -314,6 +337,24 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
}
}
+ /**
+ * All metadata writers will be returned if no metadata writers are specified in gmce
+ * @param gmce
+ * @param metadataWriters
+ * @return The metadata writers allowed as specified by GMCE. Relative order of {@code metadataWriters} is maintained
+ */
+ @VisibleForTesting
+ static List<MetadataWriter> getAllowedMetadataWriters(GobblinMetadataChangeEvent gmce, List<MetadataWriter> metadataWriters) {
+ if (CollectionUtils.isEmpty(gmce.getAllowedMetadataWriters())) {
+ return metadataWriters;
+ }
+
+ Set<String> allowSet = new HashSet<>(gmce.getAllowedMetadataWriters());
+ return metadataWriters.stream()
+ .filter(writer -> allowSet.contains(writer.getClass().getName()))
+ .collect(Collectors.toList());
+ }
+
private void addOrThrowException(Exception e, String tableString, String dbName, String tableName) throws IOException{
TableStatus tableStatus = tableOperationTypeMap.get(tableString);
Map<String, GobblinMetadataException> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
@@ -379,7 +420,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
*/
@Override
public void flush() throws IOException {
- log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
+ log.info(String.format("begin flushing %s records", String.valueOf(recordCount.get())));
for (String tableString : tableOperationTypeMap.keySet()) {
List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
flush(tid.get(0), tid.get(1));
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index c5a4e382c..476ec36af 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -101,7 +101,7 @@ public class GobblinMCEPublisherTest {
InputStream dataInputStream = clazz.getClassLoader().getResourceAsStream(schemaPath);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, dataInputStream);
GenericRecord recordContainer = reader.read(null, decoder);
- ;
+
try {
while (recordContainer != null) {
records.add(recordContainer);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
new file mode 100644
index 000000000..32e8a34e1
--- /dev/null
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import lombok.SneakyThrows;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Test class uses PowerMockito and Testng
+ * References:
+ * https://github.com/powermock/powermock/issues/434
+ * https://jivimberg.io/blog/2016/04/03/using-powermock-plus-testng-to-mock-a-static-class/
+ * https://www.igorkromin.net/index.php/2018/10/04/how-to-fix-powermock-exception-linkageerror-loader-constraint-violation/
+ */
+@PrepareForTest({GobblinConstructorUtils.class, FileSystem.class})
+@PowerMockIgnore("javax.management.*")
+public class GobblinMCEWriterTest extends PowerMockTestCase {
+
+ private String dbName = "hivedb";
+ private String tableName = "testTable";
+ private GobblinMCEWriter gobblinMCEWriter;
+ private KafkaStreamingExtractor.KafkaWatermark watermark;
+
+ private GobblinMetadataChangeEvent.Builder gmceBuilder;
+
+ // Not using field injection because they must be different classes
+ private MetadataWriter mockWriter;
+ private MetadataWriter exceptionWriter;
+
+ @Mock
+ private FileSystem fs;
+
+ @Mock
+ private HiveSpec mockHiveSpec;
+
+ @Mock
+ private HiveTable mockTable;
+
+ @AfterMethod
+ public void clean() throws Exception {
+ gobblinMCEWriter.close();
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ initMocks();
+ gmceBuilder = GobblinMetadataChangeEvent.newBuilder()
+ .setDatasetIdentifier(DatasetIdentifier.newBuilder()
+ .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+ .setNativeName("testDB/testTable")
+ .build())
+ .setFlowId("testFlow")
+ .setSchemaSource(SchemaSource.EVENT)
+ .setOperationType(OperationType.add_files)
+ .setCluster(ClustersNames.getInstance().getClusterName());
+ watermark = new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(10L));
+
+ State state = new State();
+ String metadataWriters = String.join(",",
+ Arrays.asList(mockWriter.getClass().getName(), exceptionWriter.getClass().getName()));
+ state.setProp("gmce.metadata.writer.classes", metadataWriters);
+
+ Mockito.doNothing().when(mockWriter)
+ .writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+ Mockito.doThrow(new IOException("Test Exception")).when(exceptionWriter)
+ .writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+
+ PowerMockito.mockStatic(GobblinConstructorUtils.class);
+ when(GobblinConstructorUtils.invokeConstructor(
+ eq(MetadataWriter.class), eq(mockWriter.getClass().getName()), any(State.class)))
+ .thenReturn(mockWriter);
+ when(GobblinConstructorUtils.invokeConstructor(
+ eq(MetadataWriter.class), eq(exceptionWriter.getClass().getName()), any(State.class)))
+ .thenReturn(exceptionWriter);
+
+ PowerMockito.mockStatic(FileSystem.class);
+ when(FileSystem.get(any()))
+ .thenReturn(fs);
+
+ when(mockTable.getDbName()).thenReturn(dbName);
+ when(mockTable.getTableName()).thenReturn(tableName);
+ when(mockHiveSpec.getTable()).thenReturn(mockTable);
+
+ gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+ }
+
+ @Test
+ public void testWriteWhenWriterSpecified() throws IOException {
+ gmceBuilder.setAllowedMetadataWriters(Arrays.asList(mockWriter.getClass().getName()));
+ writeWithMetadataWriters(gmceBuilder.build());
+
+ Mockito.verify(mockWriter, Mockito.times(1)).writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+ Mockito.verify(exceptionWriter, never()).writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+ }
+
+ @Test
+ public void testFaultTolerance() throws IOException {
+
+ gobblinMCEWriter.setMaxErrorDataset(1);
+ gobblinMCEWriter.metadataWriters = Arrays.asList(mockWriter, exceptionWriter, mockWriter);
+ gobblinMCEWriter.tableOperationTypeMap = new HashMap<>();
+
+ String dbName2 = dbName + "2";
+ String otherDb = "someOtherDB";
+ addTableStatus(dbName, "datasetPath");
+ addTableStatus(dbName2, "datasetPath");
+ addTableStatus(otherDb, "otherDatasetPath");
+
+ BiConsumer<String, String> verifyMocksCalled = new BiConsumer<String, String>(){
+ private int timesCalled = 0;
+ @Override
+ @SneakyThrows
+ public void accept(String dbName, String tableName) {
+ timesCalled++;
+
+ // also validates that order is maintained since all writers after an exception should reset instead of write
+ Mockito.verify(mockWriter, Mockito.times(timesCalled)).writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+ Mockito.verify(exceptionWriter, Mockito.times(timesCalled)).writeEnvelope(
+ Mockito.any(RecordEnvelope.class), Mockito.anyMap(), Mockito.anyMap(), Mockito.any(HiveSpec.class));
+ Mockito.verify(exceptionWriter, Mockito.times(1)).reset(dbName, tableName);
+ Mockito.verify(mockWriter, Mockito.times(1)).reset(dbName, tableName);
+ }
+ };
+
+ writeWithMetadataWriters(gmceBuilder.build());
+ verifyMocksCalled.accept(dbName, tableName);
+
+ // Another exception for same dataset but different db
+ when(mockTable.getDbName()).thenReturn(dbName2);
+ writeWithMetadataWriters(gmceBuilder.build());
+ verifyMocksCalled.accept(dbName2, tableName);
+
+ // exception thrown because exceeds max number of datasets with errors
+ when(mockTable.getDbName()).thenReturn(otherDb);
+ Assert.expectThrows(IOException.class, () -> writeWithMetadataWriters(gmceBuilder.setDatasetIdentifier(DatasetIdentifier.newBuilder()
+ .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+ .setNativeName("someOtherDB/testTable")
+ .build()).build()));
+ }
+
+ @Test(dataProvider = "AllowMockMetadataWriter")
+ public void testGetAllowedMetadataWriters(List<String> metadataWriters) {
+ Assert.assertNotEquals(mockWriter.getClass().getName(), exceptionWriter.getClass().getName());
+ gmceBuilder.setAllowedMetadataWriters(metadataWriters);
+ List<MetadataWriter> allowedWriters = GobblinMCEWriter.getAllowedMetadataWriters(
+ gmceBuilder.build(),
+ Arrays.asList(mockWriter, exceptionWriter));
+
+ Assert.assertEquals(allowedWriters.size(), 2);
+ Assert.assertEquals(allowedWriters.get(0).getClass().getName(), mockWriter.getClass().getName());
+ Assert.assertEquals(allowedWriters.get(1).getClass().getName(), exceptionWriter.getClass().getName());
+ }
+
+ @DataProvider(name="AllowMockMetadataWriter")
+ public Object[][] allowMockMetadataWriterParams() {
+ initMocks();
+ return new Object[][] {
+ {Arrays.asList(mockWriter.getClass().getName(), exceptionWriter.getClass().getName())},
+ {Collections.emptyList()}
+ };
+ }
+
+ private void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ // Hacky way to have 2 mock MetadataWriter "classes" with different underlying names
+ mockWriter = Mockito.mock(MetadataWriter.class);
+ exceptionWriter = Mockito.mock(TestExceptionMetadataWriter.class);
+ }
+
+ private static abstract class TestExceptionMetadataWriter implements MetadataWriter { }
+
+ private void writeWithMetadataWriters(GobblinMetadataChangeEvent gmce) throws IOException {
+ List<MetadataWriter> allowedMetadataWriters = GobblinMCEWriter.getAllowedMetadataWriters(
+ gmce, gobblinMCEWriter.getMetadataWriters());
+
+ gobblinMCEWriter.writeWithMetadataWriters(new RecordEnvelope<>(gmce, watermark), allowedMetadataWriters,
+ new ConcurrentHashMap(), new ConcurrentHashMap(), mockHiveSpec);
+ }
+
+ private void addTableStatus(String dbName, String datasetPath) {
+ gobblinMCEWriter.tableOperationTypeMap.put(dbName + "." + tableName, new GobblinMCEWriter.TableStatus(
+ OperationType.add_files, datasetPath, "GobblinMetadataChangeEvent_test-1", 0, 50));
+ }
+
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new PowerMockObjectFactory();
+ }
+}
+
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index d6906d3c4..7e763c60f 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -61,6 +61,7 @@ import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.hive.writer.HiveMetadataWriter;
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.DataMetrics;
import org.apache.gobblin.metadata.DataOrigin;
@@ -136,11 +137,11 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
client.createDatabase(
new Database(dedupedDbName, "dedupeddatabase", tmpDir.getAbsolutePath() + "/metastore_deduped", Collections.emptyMap()));
}
- hourlyDataFile_1 = new File(tmpDir, "data/tracking/testTable/hourly/2020/03/17/08/data.avro");
+ hourlyDataFile_1 = new File(tmpDir, "testDB/testTable/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
- hourlyDataFile_2 = new File(tmpDir, "data/tracking/testTable/hourly/2020/03/17/09/data.avro");
+ hourlyDataFile_2 = new File(tmpDir, "testDB/testTable/hourly/2020/03/17/09/data.avro");
Files.createParentDirs(hourlyDataFile_2);
- dailyDataFile = new File(tmpDir, "data/tracking/testTable/daily/2020/03/17/data.avro");
+ dailyDataFile = new File(tmpDir, "testDB/testTable/daily/2020/03/17/data.avro");
Files.createParentDirs(dailyDataFile);
dataDir = new File(hourlyDataFile_1.getParent());
Assert.assertTrue(dataDir.exists());
@@ -152,8 +153,8 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
gmce = GobblinMetadataChangeEvent.newBuilder()
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
- .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
- .setNativeName("/data/tracking/testTable")
+ .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+ .setNativeName("/testDB/testTable")
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -169,6 +170,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
.setPartitionColumns(Lists.newArrayList("testpartition"))
.setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
.setRegistrationProperties(registrationState)
+ .setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName()))
.build();
state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index c6e5ce579..288d689ea 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.iceberg.writer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,6 +31,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificData;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -123,11 +125,11 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
startMetastore();
tmpDir = Files.createTempDir();
- hourlyDataFile_1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
+ hourlyDataFile_1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
- hourlyDataFile_2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/09/data.avro");
+ hourlyDataFile_2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/09/data.avro");
Files.createParentDirs(hourlyDataFile_2);
- dailyDataFile = new File(tmpDir, "data/tracking/testIcebergTable/daily/2020/03/17/data.avro");
+ dailyDataFile = new File(tmpDir, "testDB/testIcebergTable/daily/2020/03/17/data.avro");
Files.createParentDirs(dailyDataFile);
dataDir = new File(hourlyDataFile_1.getParent());
Assert.assertTrue(dataDir.exists());
@@ -137,8 +139,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
gmce = GobblinMetadataChangeEvent.newBuilder()
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
- .setDataPlatformUrn("urn:li:dataPlatform:hdfs")
- .setNativeName(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+ .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
+ .setNativeName(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -154,6 +156,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
.setPartitionColumns(Lists.newArrayList("testpartition"))
.setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
.setRegistrationProperties(ImmutableMap.<String, String>builder().put("hive.database.name", dbName).build())
+ .setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName()))
.build();
State state = getState();
@@ -219,7 +222,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
Assert.assertEquals(table.location(),
- new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+ new File(tmpDir, "testDB/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
GenericRecord genericGmce_1000_2000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -346,7 +349,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Mockito.doThrow(new IOException("Test failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
gobblinMCEWriter.metadataWriters.add(0, mockWriter);
- GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ GobblinMetadataChangeEvent gmceWithMockWriter = SpecificData.get().deepCopy(gmce.getSchema(), gmce);
+ gmceWithMockWriter.setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName(), mockWriter.getClass().getName()));
+
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmceWithMockWriter.getSchema(), gmceWithMockWriter);
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
@@ -358,10 +364,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
- .get(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+ .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
.get("hivedb.testIcebergTable").lowWatermark, 50L);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
- .get(new File(tmpDir, "data/tracking/testIcebergTable").getAbsolutePath())
+ .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
.get("hivedb.testIcebergTable").highWatermark, 52L);
// No events sent yet since the topic has not been flushed
@@ -378,8 +384,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals(eventsSent.get(0).getMetadata().get(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK), "52");
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 0);
- gmce.getDatasetIdentifier().setNativeName("data/tracking/testFaultTolerant");
- GenericRecord genericGmce_differentDb = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gmceWithMockWriter.getDatasetIdentifier().setNativeName("testDB/testFaultTolerant");
+ GenericRecord genericGmce_differentDb = GenericData.get().deepCopy(gmceWithMockWriter.getSchema(), gmceWithMockWriter);
Assert.expectThrows(IOException.class, () -> gobblinMCEWriter.writeEnvelope((new RecordEnvelope<>(genericGmce_differentDb,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
@@ -393,7 +399,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
// Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
- File hourlyFile = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/10/data.avro");
+ File hourlyFile = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/10/data.avro");
long timestampMillis = 1631811600000L;
Files.createParentDirs(hourlyFile);
writeRecord(hourlyFile);
@@ -429,7 +435,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertTrue(dfl.hasNext());
// Test when completeness watermark is still "2021-09-16-10" but have a late file for "2021-09-16-09"
- File hourlyFile1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/09/data1.avro");
+ File hourlyFile1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/09/data1.avro");
Files.createParentDirs(hourlyFile1);
writeRecord(hourlyFile1);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -452,7 +458,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
// Test when completeness watermark will advance to "2021-09-16-11"
- File hourlyFile2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2021/09/16/11/data.avro");
+ File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
Files.createParentDirs(hourlyFile2);
writeRecord(hourlyFile2);
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index aa3dd8b45..c5f80ea43 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -276,6 +276,16 @@
"doc": "The avro schema with iceberg schema id for the data, this is used to match the field id in the file metrics to the field name",
"default": null,
"optional": true
+ },
+ {
+ "name": "allowedMetadataWriters",
+ "type": ["null",{
+ "type": "array",
+ "items" : "string"
+ } ],
+ "default": null,
+ "doc": "Array of the metadata writers that are allowed to consume this GMCE. If this field is missing, all metadata writers are allowed.",
+ "optional": true
}
]
}