You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/01/03 12:39:03 UTC
[hudi] branch master updated: [HUDI-1423] Support delete in
hudi-java-client (#2353)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ff8313c [HUDI-1423] Support delete in hudi-java-client (#2353)
ff8313c is described below
commit ff8313caf1dfe0b13fa9f160489a74600b0e8756
Author: Shen Hong <sh...@126.com>
AuthorDate: Sun Jan 3 20:38:45 2021 +0800
[HUDI-1423] Support delete in hudi-java-client (#2353)
---
.../apache/hudi/client/HoodieJavaWriteClient.java | 6 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 3 +-
.../commit/JavaDeleteCommitActionExecutor.java | 46 ++++++++
.../hudi/table/action/commit/JavaDeleteHelper.java | 125 +++++++++++++++++++++
.../java/HoodieJavaWriteClientExample.java | 10 ++
5 files changed, 188 insertions(+), 2 deletions(-)
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 71a85de..2b5e607 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -167,7 +167,11 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> delete(List<HoodieKey> keys,
String instantTime) {
- throw new HoodieNotSupportedException("Delete is not supported in HoodieJavaClient");
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
+ getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
+ setOperationType(WriteOperationType.DELETE);
+ HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
+ return postWrite(result, instantTime, table);
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index ddc995a..9a22f75 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
+import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
@@ -81,7 +82,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context,
String instantTime,
List<HoodieKey> keys) {
- throw new HoodieNotSupportedException("Delete is not supported yet");
+ return new JavaDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute();
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java
new file mode 100644
index 0000000..ca61af4
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteCommitActionExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
+ private final List<HoodieKey> keys;
+
+ public JavaDeleteCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config, HoodieTable table,
+ String instantTime, List<HoodieKey> keys) {
+ super(context, config, table, instantTime, WriteOperationType.DELETE);
+ this.keys = keys;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return JavaDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+ }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
new file mode 100644
index 0000000..0de1111
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaDeleteHelper<R> extends
+ AbstractDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
+
+ private JavaDeleteHelper() {
+ }
+
+ private static class DeleteHelperHolder {
+ private static final JavaDeleteHelper JAVA_DELETE_HELPER = new JavaDeleteHelper();
+ }
+
+ public static JavaDeleteHelper newInstance() {
+ return DeleteHelperHolder.JAVA_DELETE_HELPER;
+ }
+
+ @Override
+ public List<HoodieKey> deduplicateKeys(List<HoodieKey> keys,
+ HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table,
+ int parallelism) {
+ boolean isIndexingGlobal = table.getIndex().isGlobal();
+ if (isIndexingGlobal) {
+ HashSet<String> recordKeys = keys.stream().map(HoodieKey::getRecordKey).collect(Collectors.toCollection(HashSet::new));
+ List<HoodieKey> deduplicatedKeys = new LinkedList<>();
+ keys.forEach(x -> {
+ if (recordKeys.contains(x.getRecordKey())) {
+ deduplicatedKeys.add(x);
+ }
+ });
+ return deduplicatedKeys;
+ } else {
+ HashSet<HoodieKey> set = new HashSet<>(keys);
+ keys.clear();
+ keys.addAll(set);
+ return keys;
+ }
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute(String instantTime,
+ List<HoodieKey> keys,
+ HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table,
+ BaseCommitActionExecutor<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> deleteExecutor) {
+ try {
+ HoodieWriteMetadata<List<WriteStatus>> result = null;
+ List<HoodieKey> dedupedKeys = keys;
+ final int parallelism = config.getDeleteShuffleParallelism();
+ if (config.shouldCombineBeforeDelete()) {
+ // De-dupe/merge if needed
+ dedupedKeys = deduplicateKeys(keys, table, parallelism);
+ }
+
+ List<HoodieRecord<EmptyHoodieRecordPayload>> dedupedRecords =
+ dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
+ Instant beginTag = Instant.now();
+ // perform index look up to get existing location of records
+ List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
+ table.getIndex().tagLocation(dedupedRecords, context, table);
+ Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
+
+ // filter out non existent keys/records
+ List<HoodieRecord<EmptyHoodieRecordPayload>> taggedValidRecords = taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).collect(Collectors.toList());
+ if (!taggedValidRecords.isEmpty()) {
+ result = deleteExecutor.execute(taggedValidRecords);
+ result.setIndexLookupDuration(tagLocationDuration);
+ } else {
+ // if entire set of keys are non existent
+ deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
+ result = new HoodieWriteMetadata<>();
+ result.setWriteStatuses(Collections.EMPTY_LIST);
+ deleteExecutor.commitOnAutoCommit(result);
+ }
+ return result;
+ } catch (Throwable e) {
+ if (e instanceof HoodieUpsertException) {
+ throw (HoodieUpsertException) e;
+ }
+ throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
+ }
+ }
+
+}
diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index 31fccfa..6cb1ea9 100644
--- a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -104,6 +105,15 @@ public class HoodieJavaWriteClientExample {
recordsSoFar.stream().map(r -> new HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
client.upsert(writeRecords, newCommitTime);
+ // Delete
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ // just delete half of the records
+ int numToDelete = recordsSoFar.size() / 2;
+ List<HoodieKey> toBeDeleted =
+ recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+ client.delete(toBeDeleted, newCommitTime);
+
client.close();
}
}