You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/01/03 12:39:03 UTC

[hudi] branch master updated: [HUDI-1423] Support delete in hudi-java-client (#2353)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff8313c  [HUDI-1423] Support delete in hudi-java-client (#2353)
ff8313c is described below

commit ff8313caf1dfe0b13fa9f160489a74600b0e8756
Author: Shen Hong <sh...@126.com>
AuthorDate: Sun Jan 3 20:38:45 2021 +0800

    [HUDI-1423] Support delete in hudi-java-client (#2353)
---
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   6 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   3 +-
 .../commit/JavaDeleteCommitActionExecutor.java     |  46 ++++++++
 .../hudi/table/action/commit/JavaDeleteHelper.java | 125 +++++++++++++++++++++
 .../java/HoodieJavaWriteClientExample.java         |  10 ++
 5 files changed, 188 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 71a85de..2b5e607 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -167,7 +167,11 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public List<WriteStatus> delete(List<HoodieKey> keys,
                                   String instantTime) {
-    throw new HoodieNotSupportedException("Delete is not supported in HoodieJavaClient");
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
+        getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
+    setOperationType(WriteOperationType.DELETE);
+    HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
+    return postWrite(result, instantTime, table);
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index ddc995a..9a22f75 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
+import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
@@ -81,7 +82,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
   public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context,
                                                        String instantTime,
                                                        List<HoodieKey> keys) {
-    throw new HoodieNotSupportedException("Delete is not supported yet");
+    return new JavaDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java
new file mode 100644
index 0000000..ca61af4
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
+  private final List<HoodieKey> keys;
+
+  public JavaDeleteCommitActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteConfig config, HoodieTable table,
+                                         String instantTime, List<HoodieKey> keys) {
+    super(context, config, table, instantTime, WriteOperationType.DELETE);
+    this.keys = keys;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return JavaDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
new file mode 100644
index 0000000..0de1111
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaDeleteHelper<R> extends
+    AbstractDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
+
+  private JavaDeleteHelper() {
+  }
+
+  private static class DeleteHelperHolder {
+    private static final JavaDeleteHelper JAVA_DELETE_HELPER = new JavaDeleteHelper();
+  }
+
+  public static JavaDeleteHelper newInstance() {
+    return DeleteHelperHolder.JAVA_DELETE_HELPER;
+  }
+
+  @Override
+  public List<HoodieKey> deduplicateKeys(List<HoodieKey> keys,
+                                         HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table,
+                                         int parallelism) {
+    boolean isIndexingGlobal = table.getIndex().isGlobal();
+    if (isIndexingGlobal) {
+      HashSet<String> recordKeys = keys.stream().map(HoodieKey::getRecordKey).collect(Collectors.toCollection(HashSet::new));
+      List<HoodieKey> deduplicatedKeys = new LinkedList<>();
+      keys.forEach(x -> {
+        if (recordKeys.contains(x.getRecordKey())) {
+          deduplicatedKeys.add(x);
+        }
+      });
+      return deduplicatedKeys;
+    } else {
+      HashSet<HoodieKey> set = new HashSet<>(keys);
+      keys.clear();
+      keys.addAll(set);
+      return keys;
+    }
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute(String instantTime,
+                                                        List<HoodieKey> keys,
+                                                        HoodieEngineContext context,
+                                                        HoodieWriteConfig config,
+                                                        HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table,
+                                                        BaseCommitActionExecutor<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> deleteExecutor) {
+    try {
+      HoodieWriteMetadata<List<WriteStatus>> result = null;
+      List<HoodieKey> dedupedKeys = keys;
+      final int parallelism = config.getDeleteShuffleParallelism();
+      if (config.shouldCombineBeforeDelete()) {
+        // De-dupe/merge if needed
+        dedupedKeys = deduplicateKeys(keys, table, parallelism);
+      }
+
+      List<HoodieRecord<EmptyHoodieRecordPayload>> dedupedRecords =
+          dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
+      Instant beginTag = Instant.now();
+      // perform index look up to get existing location of records
+      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
+          table.getIndex().tagLocation(dedupedRecords, context, table);
+      Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
+
+      // filter out non existent keys/records
+      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedValidRecords = taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).collect(Collectors.toList());
+      if (!taggedValidRecords.isEmpty()) {
+        result = deleteExecutor.execute(taggedValidRecords);
+        result.setIndexLookupDuration(tagLocationDuration);
+      } else {
+        // if entire set of keys are non existent
+        deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
+        result = new HoodieWriteMetadata<>();
+        result.setWriteStatuses(Collections.EMPTY_LIST);
+        deleteExecutor.commitOnAutoCommit(result);
+      }
+      return result;
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
+    }
+  }
+
+}
diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index 31fccfa..6cb1ea9 100644
--- a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -104,6 +105,15 @@ public class HoodieJavaWriteClientExample {
         recordsSoFar.stream().map(r -> new HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
     client.upsert(writeRecords, newCommitTime);
 
+    // Delete
+    newCommitTime = client.startCommit();
+    LOG.info("Starting commit " + newCommitTime);
+    // just delete half of the records
+    int numToDelete = recordsSoFar.size() / 2;
+    List<HoodieKey> toBeDeleted =
+        recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+    client.delete(toBeDeleted, newCommitTime);
+
     client.close();
   }
 }