You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/02 18:34:56 UTC
[samza] branch master updated: added table delete api
This is an automated email from the ASF dual-hosted git repository.
weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 12e4263 added table delete api
12e4263 is described below
commit 12e426345da6d2cc1d4f8d8ae234f0e8514ca093
Author: Boris S <bs...@linkedin.com>
AuthorDate: Thu May 2 11:34:48 2019 -0700
added table delete api
Call sendto(key, null) to delete from a table by key.
Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Wei Song <ws...@linkedin.com>
Closes #1013 from sborya/EspressoDelete
---
.../org/apache/samza/operators/impl/SendToTableOperatorImpl.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index f3aba5b..a5e8ea1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -55,7 +55,13 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, KV<K,
@Override
protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
TaskCoordinator coordinator) {
- table.put(message.getKey(), message.getValue());
+ K key = message.getKey();
+ V val = message.getValue();
+ if (val == null) {
+ table.delete(key);
+ } else {
+ table.put(key, val);
+ }
return CompletableFuture.completedFuture(Collections.singleton(message));
}