You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/10/12 07:39:21 UTC

[hudi] branch master updated: [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c407beee01 [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)
c407beee01 is described below

commit c407beee014652cc7b8949e47a2d342f01d1f165
Author: Pramod Biligiri <pr...@gmail.com>
AuthorDate: Wed Oct 12 13:09:12 2022 +0530

    [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (#6886)
---
 .../hudi/sync/datahub/DataHubSyncClient.java       |  91 +++++++++------
 .../hudi/sync/datahub/DatahubResponseLogger.java   |  51 ++++++++
 .../sync/datahub/DummyPartitionValueExtractor.java |  34 ++++++
 .../hudi/sync/datahub/TestDataHubSyncClient.java   | 130 +++++++++++++++++++++
 4 files changed, 273 insertions(+), 33 deletions(-)

diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 4c050451c5..35a19d799c 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.sync.datahub;
 
+import com.linkedin.common.Status;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
@@ -51,7 +52,6 @@ import datahub.event.MetadataChangeProposalWrapper;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.parquet.schema.MessageType;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +61,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
 
   protected final DataHubSyncConfig config;
   private final DatasetUrn datasetUrn;
+  private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false);
 
   public DataHubSyncClient(DataHubSyncConfig config) {
     super(config);
@@ -81,45 +82,33 @@ public class DataHubSyncClient extends HoodieSyncClient {
   @Override
   public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
     MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
-        .entityType("dataset")
-        .entityUrn(datasetUrn)
-        .upsert()
-        .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
-        .build();
+            .entityType("dataset")
+            .entityUrn(datasetUrn)
+            .upsert()
+            .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
+            .build();
+
+    DatahubResponseLogger responseLogger = new DatahubResponseLogger();
 
     try (RestEmitter emitter = config.getRestEmitter()) {
-      emitter.emit(propertiesChangeProposal, null).get();
+      emitter.emit(propertiesChangeProposal, responseLogger).get();
     } catch (Exception e) {
-      throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + tableProperties, e);
+      throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": "
+              + tableProperties, e);
     }
   }
 
   @Override
   public void updateTableSchema(String tableName, MessageType schema) {
-    Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
-    List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new SchemaField()
-        .setFieldPath(f.name())
-        .setType(toSchemaFieldDataType(f.schema().getType()))
-        .setDescription(f.doc(), SetMode.IGNORE_NULL)
-        .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
-
-    final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
-    platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
-    MetadataChangeProposalWrapper schemaChangeProposal = MetadataChangeProposalWrapper.builder()
-        .entityType("dataset")
-        .entityUrn(datasetUrn)
-        .upsert()
-        .aspect(new SchemaMetadata()
-            .setSchemaName(tableName)
-            .setVersion(0)
-            .setHash("")
-            .setPlatform(datasetUrn.getPlatformEntity())
-            .setPlatformSchema(platformSchema)
-            .setFields(new SchemaFieldArray(fields)))
-        .build();
-
     try (RestEmitter emitter = config.getRestEmitter()) {
-      emitter.emit(schemaChangeProposal, null).get();
+      DatahubResponseLogger responseLogger = new DatahubResponseLogger();
+      MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName);
+      emitter.emit(schemaChange, responseLogger).get();
+
+      // When updating an entity, it is ncessary to set its soft-delete status to false, or else the update won't get
+      // reflected in the UI.
+      MetadataChangeProposalWrapper softDeleteUndoProposal = createUndoSoftDelete();
+      emitter.emit(softDeleteUndoProposal, responseLogger).get();
     } catch (Exception e) {
       throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e);
     }
@@ -127,7 +116,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
 
   @Override
   public Map<String, String> getMetastoreSchema(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `getTableSchema`");
+    throw new UnsupportedOperationException("Not supported: `getMetastoreSchema`");
   }
 
   @Override
@@ -135,7 +124,43 @@ public class DataHubSyncClient extends HoodieSyncClient {
     // no op;
   }
 
-  static Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
+  private MetadataChangeProposalWrapper createUndoSoftDelete() {
+    MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder()
+            .entityType("dataset")
+            .entityUrn(datasetUrn)
+            .upsert()
+            .aspect(SOFT_DELETE_FALSE)
+            .aspectName("status")
+            .build();
+    return softDeleteUndoProposal;
+  }
+
+  private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName) {
+    Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
+    List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new SchemaField()
+            .setFieldPath(f.name())
+            .setType(toSchemaFieldDataType(f.schema().getType()))
+            .setDescription(f.doc(), SetMode.IGNORE_NULL)
+            .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
+
+    final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
+    platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
+
+    return MetadataChangeProposalWrapper.builder()
+            .entityType("dataset")
+            .entityUrn(datasetUrn)
+            .upsert()
+            .aspect(new SchemaMetadata()
+                    .setSchemaName(tableName)
+                    .setVersion(0)
+                    .setHash("")
+                    .setPlatform(datasetUrn.getPlatformEntity())
+                    .setPlatformSchema(platformSchema)
+                    .setFields(new SchemaFieldArray(fields)))
+            .build();
+  }
+
+  Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
     try {
       return new TableSchemaResolver(metaClient).getTableAvroSchema(true);
     } catch (Exception e) {
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
new file mode 100644
index 0000000000..e99e7109e5
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import datahub.client.Callback;
+import datahub.client.MetadataWriteResponse;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Handle responses to requests to Datahub Metastore. Just logs them.
+ */
+public class DatahubResponseLogger implements Callback {
+  private static final Logger LOG = LogManager.getLogger(DatahubResponseLogger.class);
+
+  @Override
+  public void onCompletion(MetadataWriteResponse response) {
+    LOG.info("Completed Datahub RestEmitter request. "
+            + "Status: " + (response.isSuccess() ? " succeeded" : " failed"));
+    if (!response.isSuccess()) {
+      LOG.error("Request failed. " + response);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Response details: " + response);
+    }
+  }
+
+  @Override
+  public void onFailure(Throwable e) {
+    LOG.error("Error during Datahub RestEmitter request", e);
+  }
+
+}
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java
new file mode 100644
index 0000000000..3c00e313a9
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DummyPartitionValueExtractor implements PartitionValueExtractor {
+
+  @Override
+  public List<String> extractPartitionValuesInPath(String partitionPath) {
+    return Collections.emptyList();
+  }
+
+}
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
new file mode 100644
index 0000000000..279167fc04
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import datahub.client.MetadataWriteResponse;
+import datahub.client.rest.RestEmitter;
+import datahub.event.MetadataChangeProposalWrapper;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestDataHubSyncClient {
+
+  @Mock
+  RestEmitter restEmitterMock;
+
+  @TempDir
+  static java.nio.file.Path tmpDir;
+
+  private static String TRIP_EXAMPLE_SCHEMA;
+  private static Schema avroSchema;
+  private static String tableBasePath;
+
+  @BeforeAll
+  public static void beforeAll() throws IOException {
+    TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+            + "{\"name\": \"ts\",\"type\": \"long\"}]}";
+
+    avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+
+    Properties props = new Properties();
+    props.put("hoodie.table.name", "some_table");
+    tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString();
+    HoodieTableMetaClient.initTableAndGetMetaClient(new Configuration(),
+            tableBasePath, props);
+  }
+
+  @BeforeEach
+  public void beforeEach() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @AfterEach
+  public void afterEach() {
+  }
+
+  @Test
+  public void testUpdateTableSchemaInvokesRestEmiiter() throws IOException {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    Mockito.when(
+        restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any())
+    ).thenReturn(
+        CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+    );
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    dhClient.updateTableSchema("some_table", null);
+    verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposalWrapper.class),
+            Mockito.any());
+  }
+
+  public class DataHubSyncClientStub extends DataHubSyncClient {
+
+    public DataHubSyncClientStub(DataHubSyncConfig config) {
+      super(config);
+    }
+
+    @Override
+    Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
+      return avroSchema;
+    }
+
+  }
+
+  public class DatahubSyncConfigStub extends DataHubSyncConfig {
+
+    private final RestEmitter emitterMock;
+
+    public DatahubSyncConfigStub(Properties props, RestEmitter emitterMock) {
+      super(props);
+      this.emitterMock = emitterMock;
+    }
+
+    @Override
+    public RestEmitter getRestEmitter() {
+      return emitterMock;
+    }
+  }
+
+}