You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/02 18:34:56 UTC

[samza] branch master updated: added table delete api

This is an automated email from the ASF dual-hosted git repository.

weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e4263  added table delete api
12e4263 is described below

commit 12e426345da6d2cc1d4f8d8ae234f0e8514ca093
Author: Boris S <bs...@linkedin.com>
AuthorDate: Thu May 2 11:34:48 2019 -0700

    added table delete api
    
    Call sendto(key, null) to delete from a table by key.
    
    Author: Boris S <bs...@linkedin.com>
    Author: Boris S <bo...@apache.org>
    Author: Boris Shkolnik <bs...@linkedin.com>
    
    Reviewers: Wei Song <ws...@linkedin.com>
    
    Closes #1013 from sborya/EspressoDelete
---
 .../org/apache/samza/operators/impl/SendToTableOperatorImpl.java  | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index f3aba5b..a5e8ea1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -55,7 +55,13 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, KV<K,
   @Override
   protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    table.put(message.getKey(), message.getValue());
+    K key = message.getKey();
+    V val = message.getValue();
+    if (val == null) {
+      table.delete(key);
+    } else {
+      table.put(key, val);
+    }
     return CompletableFuture.completedFuture(Collections.singleton(message));
   }