You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/10/12 07:39:21 UTC
[hudi] branch master updated: [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c407beee01 [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)
c407beee01 is described below
commit c407beee014652cc7b8949e47a2d342f01d1f165
Author: Pramod Biligiri <pr...@gmail.com>
AuthorDate: Wed Oct 12 13:09:12 2022 +0530
[HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)
---
.../hudi/sync/datahub/DataHubSyncClient.java | 91 +++++++++------
.../hudi/sync/datahub/DatahubResponseLogger.java | 51 ++++++++
.../sync/datahub/DummyPartitionValueExtractor.java | 34 ++++++
.../hudi/sync/datahub/TestDataHubSyncClient.java | 130 +++++++++++++++++++++
4 files changed, 273 insertions(+), 33 deletions(-)
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 4c050451c5..35a19d799c 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sync.datahub;
+import com.linkedin.common.Status;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
@@ -51,7 +52,6 @@ import datahub.event.MetadataChangeProposalWrapper;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.parquet.schema.MessageType;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -61,6 +61,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
protected final DataHubSyncConfig config;
private final DatasetUrn datasetUrn;
+ private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false);
public DataHubSyncClient(DataHubSyncConfig config) {
super(config);
@@ -81,45 +82,33 @@ public class DataHubSyncClient extends HoodieSyncClient {
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
- .entityType("dataset")
- .entityUrn(datasetUrn)
- .upsert()
- .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
- .build();
+ .entityType("dataset")
+ .entityUrn(datasetUrn)
+ .upsert()
+ .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
+ .build();
+
+ DatahubResponseLogger responseLogger = new DatahubResponseLogger();
try (RestEmitter emitter = config.getRestEmitter()) {
- emitter.emit(propertiesChangeProposal, null).get();
+ emitter.emit(propertiesChangeProposal, responseLogger).get();
} catch (Exception e) {
- throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + tableProperties, e);
+ throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": "
+ + tableProperties, e);
}
}
@Override
public void updateTableSchema(String tableName, MessageType schema) {
- Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
- List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new SchemaField()
- .setFieldPath(f.name())
- .setType(toSchemaFieldDataType(f.schema().getType()))
- .setDescription(f.doc(), SetMode.IGNORE_NULL)
- .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
-
- final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
- platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
- MetadataChangeProposalWrapper schemaChangeProposal = MetadataChangeProposalWrapper.builder()
- .entityType("dataset")
- .entityUrn(datasetUrn)
- .upsert()
- .aspect(new SchemaMetadata()
- .setSchemaName(tableName)
- .setVersion(0)
- .setHash("")
- .setPlatform(datasetUrn.getPlatformEntity())
- .setPlatformSchema(platformSchema)
- .setFields(new SchemaFieldArray(fields)))
- .build();
-
try (RestEmitter emitter = config.getRestEmitter()) {
- emitter.emit(schemaChangeProposal, null).get();
+ DatahubResponseLogger responseLogger = new DatahubResponseLogger();
+ MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName);
+ emitter.emit(schemaChange, responseLogger).get();
+
+ // When updating an entity, it is ncessary to set its soft-delete status to false, or else the update won't get
+ // reflected in the UI.
+ MetadataChangeProposalWrapper softDeleteUndoProposal = createUndoSoftDelete();
+ emitter.emit(softDeleteUndoProposal, responseLogger).get();
} catch (Exception e) {
throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e);
}
@@ -127,7 +116,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
@Override
public Map<String, String> getMetastoreSchema(String tableName) {
- throw new UnsupportedOperationException("Not supported: `getTableSchema`");
+ throw new UnsupportedOperationException("Not supported: `getMetastoreSchema`");
}
@Override
@@ -135,7 +124,43 @@ public class DataHubSyncClient extends HoodieSyncClient {
// no op;
}
- static Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
+ private MetadataChangeProposalWrapper createUndoSoftDelete() {
+ MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder()
+ .entityType("dataset")
+ .entityUrn(datasetUrn)
+ .upsert()
+ .aspect(SOFT_DELETE_FALSE)
+ .aspectName("status")
+ .build();
+ return softDeleteUndoProposal;
+ }
+
+ private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName) {
+ Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
+ List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new SchemaField()
+ .setFieldPath(f.name())
+ .setType(toSchemaFieldDataType(f.schema().getType()))
+ .setDescription(f.doc(), SetMode.IGNORE_NULL)
+ .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
+
+ final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
+ platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
+
+ return MetadataChangeProposalWrapper.builder()
+ .entityType("dataset")
+ .entityUrn(datasetUrn)
+ .upsert()
+ .aspect(new SchemaMetadata()
+ .setSchemaName(tableName)
+ .setVersion(0)
+ .setHash("")
+ .setPlatform(datasetUrn.getPlatformEntity())
+ .setPlatformSchema(platformSchema)
+ .setFields(new SchemaFieldArray(fields)))
+ .build();
+ }
+
+ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchema(true);
} catch (Exception e) {
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
new file mode 100644
index 0000000000..e99e7109e5
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import datahub.client.Callback;
+import datahub.client.MetadataWriteResponse;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Handle responses to requests to Datahub Metastore. Just logs them.
+ */
+public class DatahubResponseLogger implements Callback {
+ private static final Logger LOG = LogManager.getLogger(DatahubResponseLogger.class);
+
+ @Override
+ public void onCompletion(MetadataWriteResponse response) {
+ LOG.info("Completed Datahub RestEmitter request. "
+ + "Status: " + (response.isSuccess() ? " succeeded" : " failed"));
+ if (!response.isSuccess()) {
+ LOG.error("Request failed. " + response);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Response details: " + response);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ LOG.error("Error during Datahub RestEmitter request", e);
+ }
+
+}
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java
new file mode 100644
index 0000000000..3c00e313a9
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DummyPartitionValueExtractor implements PartitionValueExtractor {
+
+ @Override
+ public List<String> extractPartitionValuesInPath(String partitionPath) {
+ return Collections.emptyList();
+ }
+
+}
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
new file mode 100644
index 0000000000..279167fc04
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import datahub.client.MetadataWriteResponse;
+import datahub.client.rest.RestEmitter;
+import datahub.event.MetadataChangeProposalWrapper;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestDataHubSyncClient {
+
+ @Mock
+ RestEmitter restEmitterMock;
+
+ @TempDir
+ static java.nio.file.Path tmpDir;
+
+ private static String TRIP_EXAMPLE_SCHEMA;
+ private static Schema avroSchema;
+ private static String tableBasePath;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ + "{\"name\": \"ts\",\"type\": \"long\"}]}";
+
+ avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+
+ Properties props = new Properties();
+ props.put("hoodie.table.name", "some_table");
+ tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString();
+ HoodieTableMetaClient.initTableAndGetMetaClient(new Configuration(),
+ tableBasePath, props);
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @AfterEach
+ public void afterEach() {
+ }
+
+ @Test
+ public void testUpdateTableSchemaInvokesRestEmiiter() throws IOException {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ Mockito.when(
+ restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any())
+ ).thenReturn(
+ CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+ );
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ dhClient.updateTableSchema("some_table", null);
+ verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposalWrapper.class),
+ Mockito.any());
+ }
+
+ public class DataHubSyncClientStub extends DataHubSyncClient {
+
+ public DataHubSyncClientStub(DataHubSyncConfig config) {
+ super(config);
+ }
+
+ @Override
+ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
+ return avroSchema;
+ }
+
+ }
+
+ public class DatahubSyncConfigStub extends DataHubSyncConfig {
+
+ private final RestEmitter emitterMock;
+
+ public DatahubSyncConfigStub(Properties props, RestEmitter emitterMock) {
+ super(props);
+ this.emitterMock = emitterMock;
+ }
+
+ @Override
+ public RestEmitter getRestEmitter() {
+ return emitterMock;
+ }
+ }
+
+}