You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/11/16 18:12:28 UTC

[GitHub] [samza] ajothomas opened a new pull request #1560: SAMZA-2709: Adding partial update api to Table API

ajothomas opened a new pull request #1560:
URL: https://github.com/apache/samza/pull/1560


   **Feature**:
   Samza Table API currently supports PUT, GET and DELETE. Puts typically write/overwrite the data for a key. Users have frequently requested for the ability to perform partial updates i.e update select fields or a part of the record. This PR intends to add `update` to Table API.
   
   **Changes:**
   - Added `updateAsync` and `updateAllAsync` methods to `TableWriteFunction` and to `AsyncReadWriteTable`
   - All implementations of `AsyncReadWriteTable` have been changed to accommodate the API change
   - Added `sendUpdateTo` method to `MessageStream`. Added corresponding operator spec and operator implementation as well
   
   **Tests:**
   - Added tests for changes to all table implementations
   
   **API Changes**
   - TableWriteFunction was earlier using K,V generic types to represent key and record type. With partial updates, U type has been added to represent partial updates which is a backward compatible change
   - Similarly, U update generic type was added to AsyncReadWriteTable class generic signature. It affects all implementing classes as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787177828



##########
File path: samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,60 @@ default void putAll(List<Entry<K, V>> records) {
     throw new SamzaException("Not supported");
   }
 
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @return CompletableFuture for the update request
+   */
+  CompletableFuture<Void> updateAsync(K key, U update);
+
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @param args additional arguments
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAsync(K key, U update, Object ... args) {
+    throw new SamzaException("Not supported");
+  }
+
+  /**
+   * Asynchronously updates the table with {@code records} with specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls updateAsync for each entry and return a combined future.
+   * @param records updates for the table
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAllAsync(Collection<Entry<K, U>> records) {
+    final List<CompletableFuture<Void>> updateFutures = records.stream()
+        .map(entry -> updateAsync(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+    return CompletableFuture.allOf(Iterables.toArray(updateFutures, CompletableFuture.class));

Review comment:
       Yes, bubble up the exception. quite similar to what the existing `putAllAsync` does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r762369406



##########
File path: samza-api/src/main/java/org/apache/samza/table/RecordDoesNotExistException.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+/**
+ * Custom exception which can be thrown by implementations of {@link org.apache.samza.table.remote.TableWriteFunction}
+ * when {@link AsyncReadWriteTable#updateAsync(Object, Object, Object...)} fails due an existing record not being
+ * present for the given key. {@link org.apache.samza.operators.MessageStream#sendUpdateTo(Table, Object...)} will
+ * attempt to call {@link AsyncReadWriteTable#putAsync(Object, Object, Object...)} instead to insert a new record if a
+ * default is provided.
+ */
+public class RecordDoesNotExistException extends RuntimeException {

Review comment:
       I did add a test case  in `doTestStreamTableJoinRemoteTableWithFirstTimeUpdates` in `TestRemoteTableEndToEnd`. The TableWriteFunction maintains a map of records and throws a `RecordNotFoundException` for first time updates. Let me revisit it to see if I missed out of any other scenarios.

##########
File path: samza-api/src/main/java/org/apache/samza/table/RecordDoesNotExistException.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+/**
+ * Custom exception which can be thrown by implementations of {@link org.apache.samza.table.remote.TableWriteFunction}
+ * when {@link AsyncReadWriteTable#updateAsync(Object, Object, Object...)} fails due an existing record not being
+ * present for the given key. {@link org.apache.samza.operators.MessageStream#sendUpdateTo(Table, Object...)} will
+ * attempt to call {@link AsyncReadWriteTable#putAsync(Object, Object, Object...)} instead to insert a new record if a
+ * default is provided.
+ */
+public class RecordDoesNotExistException extends RuntimeException {

Review comment:
       I did add a test case  `doTestStreamTableJoinRemoteTableWithFirstTimeUpdates` in `TestRemoteTableEndToEnd`. The TableWriteFunction maintains a map of records and throws a `RecordNotFoundException` for first time updates. Let me revisit it to see if I missed out of any other scenarios.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r786914705



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);

Review comment:
       Why do you have to modify this signature? Is it colliding with the newer signature w/ `UpdateOptions` is it?
   What are the options for exiting API users that pass in var args? I don't see a path forward for them unless using the new update API even if their flow doesn't need update.
   

##########
File path: samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,60 @@ default void putAll(List<Entry<K, V>> records) {
     throw new SamzaException("Not supported");
   }
 
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @return CompletableFuture for the update request
+   */
+  CompletableFuture<Void> updateAsync(K key, U update);
+
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @param args additional arguments
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAsync(K key, U update, Object ... args) {
+    throw new SamzaException("Not supported");
+  }
+
+  /**
+   * Asynchronously updates the table with {@code records} with specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls updateAsync for each entry and return a combined future.
+   * @param records updates for the table
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAllAsync(Collection<Entry<K, U>> records) {
+    final List<CompletableFuture<Void>> updateFutures = records.stream()
+        .map(entry -> updateAsync(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+    return CompletableFuture.allOf(Iterables.toArray(updateFutures, CompletableFuture.class));

Review comment:
       What is the failure handling strategy here? Just bubble up the exception is it? 

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);

Review comment:
       why are we not passing the `UpdateOptions` here? I understand initially you had this signature to be in parity w/ the other APIs. However, the whole idea of introducing `UpdateOptions` was to make it strongly typed and hence pass it down.

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
##########
@@ -54,7 +56,12 @@ protected void handleInit(Context context) {
   @Override
   protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    return table.putAsync(message.getKey(), message.getValue(), sendToTableOpSpec.getArgs())
+    if (message.getValue() instanceof UpdateMessage) {
+      throw new SamzaException("Incorrect use of .sendTo operator with UpdateMessage value type. "
+          + "Please use the following method on MessageStream- sendTo(Table<KV<K, UpdateMessage<U, V>>> table,"
+          + "UpdateOptions updateOptions).");

Review comment:
       Can we perform this check earlier instead by not allowing type `V` of `UpdateMessage` during construction instead of handleMessageAsync?

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }

Review comment:
       don't we need any `table.init()` or some other initialization. I noticed we have `table.close()` below

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final UpdateOptions options = spec.getUpdateOptions();
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            // success, no need to Put a default value
+            return false;
+          } else if (ex.getCause() instanceof RecordNotFoundException && message.getValue().hasDefault()) {
+            // If an update fails for a given key due to a RecordDoesNotExistException exception thrown and
+            // a default is provided and the UpdateOptions is set to UPDATE_WITH_DEFAULTS, then attempt
+            // to PUT a default record for the key and then apply the update.
+            if (options == UpdateOptions.UPDATE_WITH_DEFAULTS) {
+              return true;
+            } else {
+              throw new SamzaException("Put default failed for update as the Update options was set to " + options +
+                  ". Please use UpdateOptions.UPDATE_WITH_DEFAULTS instead.");
+            }
+          } else {
+            throw new SamzaException("Update failed with exception: ", ex);
+          }
+        })
+        .thenCompose(shouldPutDefault -> {
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault());
+            return putFuture
+                .exceptionally(ex -> {
+                  LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. "
+                          + "The exception encountered is: ", ex);
+                  return null;
+                })
+                .thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate()));

Review comment:
       what happens if this fails? 

##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
+
+  /**
+   * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
+   * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
+   * the event an update fails due to an existing record being absent.
+   * The user also needs to pass UpdateOptions argument which defines whether the update is an update only operation

Review comment:
       s/`UpdateMessage`/`{@link UpdateMessage}` and move some of the texts on the semantics of default into the `UpdateMessage` java doc.
   
   Similarly link `UpdateOptions` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787156643



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }

Review comment:
       Table is initialized by the `TableProvider` when we get it from `TaskContext`. Don't need to do a table.init() again here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r752535518



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -298,6 +298,25 @@
    */
   <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
 
+  /**
+   * Allows sending messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an update pair- update and an option default.
+   * <p>
+   * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
+   * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
+   * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
+   * scheme when propagated to next operator.
+   *
+   * @param table the table to write messages to
+   * @param args additional arguments passed to the table
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   * @param <U> the type of update value for the table
+   * @return this {@link MessageStream}
+   */
+  <K, V, U> MessageStream<KV<K, UpdatePair<U, V>>> sendUpdateTo(Table<KV<K, UpdatePair<U, V>>> table, Object ... args);

Review comment:
       table should be Table<KV<K, V>> type I think.  The value of the table shouldn't be UpdatePair. If you are concerned about type of input messageStream, it's not bound to the table anyway, e.g. I can write something like MessageStream<Integer>.sendTo(Table<KV<Integer, String>).

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +92,33 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates the record associated with the specified update for a given key.
+   * If the update is attempted on a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param defaultValue the default value that is inserted in the table if the update is applied to a non-existent record.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V defaultValue, Object ... args);

Review comment:
       As discussed offline, It's better to leave the default to put. We can throw some kind of RecordNotExistException here if post fails and throw RecordExistException if put fails. I think this will simplify the logic quite a bit. Please think about whether this can cover all the cases.

##########
File path: samza-api/src/main/java/org/apache/samza/operators/UpdatePair.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+
+/**
+ * A UpdatePair representing the update and an optional default record to be inserted for a key,
+ * if the update is applied to a non-existent record.
+ *
+ * @param <U> type of the update record
+ * @param <V> type of the default record
+ */
+public final class UpdatePair<U, V> {

Review comment:
       Just to be consistent with our MessageStream api, maybe name it as UpdateMessage?

##########
File path: samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,66 @@ default void putAll(List<Entry<K, V>> records) {
     throw new SamzaException("Not supported");
   }
 
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @param record default record to be written if the update is applied to a non-existent record
+   * @return CompletableFuture for the update request
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V record);

Review comment:
       shall we make it default and throw not implmented exception?
   
   Remove default too if possible.

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdatePair;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdatePair<U, V>>, KV<K, UpdatePair<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdatePair<U, V>>>> handleMessageAsync(KV<K, UpdatePair<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    return table.updateAsync(message.getKey(), message.getValue().getUpdate(), message.getValue().getDefault(),

Review comment:
       As discussed, here we can add the logic to post default and catch exception if needed.

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +92,33 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates the record associated with the specified update for a given key.
+   * If the update is attempted on a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param defaultValue the default value that is inserted in the table if the update is applied to a non-existent record.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V defaultValue, Object ... args);
+
+  /**
+   * Asynchronously updates the records associated with the specified keys with the specified updates.
+   * If the update is applied to a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param updates the key and update mappings.
+   * @param defaults the key and default value mappings.
+   * @param args additional arguments
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> updates, @Nullable List<Entry<K, V>> defaults, Object ... args);

Review comment:
       Same above. Leave the default to put.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758879856



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordAlreadyExistsException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture.whenCompleteAsync((result, ex) -> {

Review comment:
       We need to chain these futures properly, instead of invoking the future inside a function. To chain the futures, we need to use the .thenCompose() method, e.g.:
   
   updateFuture.handle((r, ex) -> {
       if (ex == null) return true;
       else return false;
   }).thenCompose(success -> {
       if (!success) {
          final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
               sendUpdateToTableOpSpec.getArgs());
         // I think we need to update in any case here
         final CompletableFuture<Void> updateFuture = putFuture.thenCompose(r -> {
              return table.updateAsync(message.getKey(), message.getValue().getUpdate(), sendUpdateToTableOpSpec.getArgs());
         });
          return updateFuture;
       } else {
          return CompletableFuture.completedFuture(null);
       }
   }).thenApply(result -> Collections.singleton(message));




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758656519



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +92,33 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates the record associated with the specified update for a given key.
+   * If the update is attempted on a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param defaultValue the default value that is inserted in the table if the update is applied to a non-existent record.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V defaultValue, Object ... args);

Review comment:
       Changed the update API by removing defaults.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761388896



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);
+
+  /**
+   * Asynchronously updates the existing records for the given keys with their corresponding updates.
+   *
+   * @param updates the key and update mappings.
+   * @param args additional arguments
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> updates, Object ... args);

Review comment:
       Can you change the return type to CompletionStage instead of future?
   we don't want to accidentally promote folks to fallback to synchronous programming by using .get() or join on this. Please apply the same comment for all the interfaces.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787163539



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);

Review comment:
       Yeah, I had to modify the signature because it was colliding with the existing signature. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r788898783



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);

Review comment:
       then why do you need `Object ... args` then? I don't see it being present for the sake of parity. if there are no use for `UpdateOptions` within the `TableWriteFunction`, we can just start w/ `updateAsync(K key, U update)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761386429



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendUpdateToTableOperatorImpl.class);
+
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            return false;
+          } else if (!(ex.getCause() instanceof RecordDoesNotExistException)) {
+            throw new SamzaException("Update failed with exception: ", ex);
+          } else {
+            return message.getValue().getDefault() != null;
+          }
+        })
+        .thenCompose(shouldPutDefault -> {
+          // If update fails for a given key due to a RecordDoesNotExistException exception thrown and a default is
+          // provided, then attempt to PUT a default record for the key and then apply the update
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
+                sendUpdateToTableOpSpec.getArgs());
+            return putFuture
+                .exceptionally(ex -> {
+                  LOG.error("PUT default failed due the following exception: ", ex);

Review comment:
       Change this to warning since we are still going to process to do the update. Please also change the log message to say we are ignoring this error and process with updating.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r785030967



##########
File path: samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
##########
@@ -189,10 +190,19 @@ public void sink(SinkFunction<? super M> sinkFn) {
   }
 
   @Override
-  public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args) {
+  public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table) {
     String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     SendToTableOperatorSpec<K, V> op =
-        OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), opId, args);
+        OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), opId);
+    this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
+  }
+
+  @Override
+  public <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>> sendTo(Table<KV<K, V>> table, UpdateOptions contract) {

Review comment:
       nit: update var name?

##########
File path: samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
##########
@@ -227,14 +228,28 @@ public static InputOperatorSpec createInputOperatorSpec(
    *
    * @param tableId the table Id for the underlying table
    * @param opId the unique ID of the operator
-   * @param args additional arguments passed to the table
    * @param <K> the type of the table record key
    * @param <V> the type of the table record value
    * @return the {@link SendToTableOperatorSpec}
    */
   public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec(
-     String tableId, String opId, Object ... args) {
-    return new SendToTableOperatorSpec(tableId, opId, args);
+     String tableId, String opId) {
+    return new SendToTableOperatorSpec(tableId, opId);
+  }
+
+  /**
+   * Creates a {@link SendToTableWithUpdateOperatorSpec} with a key extractor and a value extractor function.
+   *
+   * @param tableId the table Id for the underlying table
+   * @param opId the unique ID of the operator
+   * @param <K> the type of the table record key
+   * @param <V> the type of the table record value
+   * @param <U> the type of the table record value
+   * @return the {@link SendToTableOperatorSpec}
+   */
+  public static <K, V, U> SendToTableWithUpdateOperatorSpec<K, V, U> createSendToTableWithUpdateOperatorSpec(
+      String tableId, String opId, UpdateOptions contract) {

Review comment:
       nit: change var name to updateOptions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787163691



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
+
+  /**
+   * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
+   * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
+   * the event an update fails due to an existing record being absent.
+   * The user also needs to pass UpdateOptions argument which defines whether the update is an update only operation

Review comment:
       Done 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787188642



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
##########
@@ -54,7 +56,12 @@ protected void handleInit(Context context) {
   @Override
   protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    return table.putAsync(message.getKey(), message.getValue(), sendToTableOpSpec.getArgs())
+    if (message.getValue() instanceof UpdateMessage) {
+      throw new SamzaException("Incorrect use of .sendTo operator with UpdateMessage value type. "
+          + "Please use the following method on MessageStream- sendTo(Table<KV<K, UpdateMessage<U, V>>> table,"
+          + "UpdateOptions updateOptions).");

Review comment:
       I would have definitely preferred to do it during construction over doing it `handleAsync`. Type erasure in java complicates creating an instance of a generic type parameter as type V is not known until runtime. I think it's possible to do it using some code like below if we had the `Class<V>` as parameter. 
   ```
   public static class Container<E> {
       private Class<E> clazz;
   
       public Container(Class<E> clazz) {
           this.clazz = clazz;
       }
   
       public E createContents() throws Exception {
           return clazz.newInstance();
       }
   }
   ```
   Please let me know if know of any nitfy reflection hacks to get around this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763267072



##########
File path: samza-api/src/main/java/org/apache/samza/table/RecordDoesNotExistException.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+/**
+ * Custom exception which can be thrown by implementations of {@link org.apache.samza.table.remote.TableWriteFunction}
+ * when {@link AsyncReadWriteTable#updateAsync(Object, Object, Object...)} fails due an existing record not being
+ * present for the given key. {@link org.apache.samza.operators.MessageStream#sendUpdateTo(Table, Object...)} will
+ * attempt to call {@link AsyncReadWriteTable#putAsync(Object, Object, Object...)} instead to insert a new record if a
+ * default is provided.
+ */
+public class RecordDoesNotExistException extends RuntimeException {

Review comment:
       Cool. If there is a test then it is fine. I was searching for this exception in the test cases but I didn't find it. I guess it might be because too many files are in this pr. My suggestion is that next time let's separate large prs into smaller ones. For example, this one can break into the core changes + batching changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763263015



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       Right, that makes sense. It should be possible to do it within the constructor of `SendUpdateToTableOperatorImpl`. The only way to do that within the constructor is to expose the `BatchProvider` from the `RemoteTable` and check if it is an instance of `CompactBatchProvider`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758655423



##########
File path: samza-api/src/main/java/org/apache/samza/operators/UpdatePair.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+
+/**
+ * A UpdatePair representing the update and an optional default record to be inserted for a key,
+ * if the update is applied to a non-existent record.
+ *
+ * @param <U> type of the update record
+ * @param <V> type of the default record
+ */
+public final class UpdatePair<U, V> {

Review comment:
       Makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758879856



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordAlreadyExistsException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture.whenCompleteAsync((result, ex) -> {

Review comment:
       We need to chain these futures properly, instead of invoking the future inside a function. To chain the futures, we need to use the .thenCompose() method, e.g.:
   
   `updateFuture.handle((r, ex) -> {
       if (ex == null) return true;
       else return false;
   }).thenCompose(success -> {
       if (!success) {
          final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
               sendUpdateToTableOpSpec.getArgs());
         // I think we need to update in any case here
         final CompletableFuture<Void> updateFuture = putFuture.thenCompose(r -> {
              return table.updateAsync(message.getKey(), message.getValue().getUpdate(), sendUpdateToTableOpSpec.getArgs());
         });
          return updateFuture;
       } else {
          return CompletableFuture.completedFuture(null);
       }
   }).thenApply(result -> Collections.singleton(message));`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761578873



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       This exception is typically caught in the `updateAsync` method when batching fails and standard `updateAsync` method is called.
   ```
    try {
         return batchProcessor.processPutDeleteOrUpdateOperations(new UpdateOperation<>(key, update, args));
       } catch (BatchingNotSupportedException e) {
         return table.updateAsync(key, update, args);
       } catch (Exception e) {
         throw new SamzaException(e);
       }
   ```

##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/UpdateOperation.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Update operation.
+ *
+ * @param <K> The type of the key.
+ * @param <U> The type of the update
+ */
+public class UpdateOperation<K, V, U> implements Operation<K, V, U> {
+  final private K key;
+  final private U val;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761407979



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendUpdateToTableOperatorImpl.class);
+
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            return false;
+          } else if (!(ex.getCause() instanceof RecordDoesNotExistException)) {
+            throw new SamzaException("Update failed with exception: ", ex);
+          } else {
+            return message.getValue().getDefault() != null;

Review comment:
       Right, thanks for catching it. This has been addressed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r788938969



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
##########
@@ -54,7 +56,12 @@ protected void handleInit(Context context) {
   @Override
   protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    return table.putAsync(message.getKey(), message.getValue(), sendToTableOpSpec.getArgs())
+    if (message.getValue() instanceof UpdateMessage) {
+      throw new SamzaException("Incorrect use of .sendTo operator with UpdateMessage value type. "
+          + "Please use the following method on MessageStream- sendTo(Table<KV<K, UpdateMessage<U, V>>> table,"
+          + "UpdateOptions updateOptions).");

Review comment:
       looks like thats the case. Maybe we can skip it for now; ideally, we should have a this part of the `OperatorSpec` so that we can enable validations on these key and value types. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r759687357



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordAlreadyExistsException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture.whenCompleteAsync((result, ex) -> {

Review comment:
       This is definitely a cleaner pattern. Changed it and added a few more tests to check if PUT is invoked in the absence of a pre existing record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761385678



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendUpdateToTableOperatorImpl.class);
+
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            return false;
+          } else if (!(ex.getCause() instanceof RecordDoesNotExistException)) {
+            throw new SamzaException("Update failed with exception: ", ex);
+          } else {
+            return message.getValue().getDefault() != null;

Review comment:
       seems the logic is still not entirely correct. Here if the default value is not provided, we should still throw the exception. I suggest changing the logic to be:
   ```
   if (exception == null) {
     return false;
   } else if (ex ... instanceof ... && default != null) {
     return true;
   } else {
     throw ...
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761410249



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);
+
+  /**
+   * Asynchronously updates the existing records for the given keys with their corresponding updates.
+   *
+   * @param updates the key and update mappings.
+   * @param args additional arguments
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> updates, Object ... args);

Review comment:
       Hey, I used `CompletableFuture` just to be in line with the existing code (putAsync, getAsync, etc.). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787259429



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final UpdateOptions options = spec.getUpdateOptions();
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            // success, no need to Put a default value
+            return false;
+          } else if (ex.getCause() instanceof RecordNotFoundException && message.getValue().hasDefault()) {
+            // If an update fails for a given key due to a RecordDoesNotExistException exception thrown and
+            // a default is provided and the UpdateOptions is set to UPDATE_WITH_DEFAULTS, then attempt
+            // to PUT a default record for the key and then apply the update.
+            if (options == UpdateOptions.UPDATE_WITH_DEFAULTS) {
+              return true;
+            } else {
+              throw new SamzaException("Put default failed for update as the Update options was set to " + options +
+                  ". Please use UpdateOptions.UPDATE_WITH_DEFAULTS instead.");
+            }
+          } else {
+            throw new SamzaException("Update failed with exception: ", ex);
+          }
+        })
+        .thenCompose(shouldPutDefault -> {
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault());
+            return putFuture
+                .exceptionally(ex -> {
+                  LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. "
+                          + "The exception encountered is: ", ex);
+                  return null;
+                })
+                .thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate()));

Review comment:
       Added an exception to indicate that update after put default failed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787182661



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);

Review comment:
       `UpdateOptions` is a parameter which defines how the platform would handle updates and we are currently using it within the operator implementation only. 
   Any `updateAsync` call would eventually be delegated to `TableWriteFunction`'s `updateAsync` method which is defined by the user for their table integrations. I don't see the use of UpdateOptions beyond the operator right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r788896284



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);

Review comment:
       so what is the option for them to get the same functionality from the new API? i.e., UpdateOptions supports these potential params that can be passed as var args is it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1560:
URL: https://github.com/apache/samza/pull/1560


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761566938



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/UpdateOperation.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Update operation.
+ *
+ * @param <K> The type of the key.
+ * @param <U> The type of the update
+ */
+public class UpdateOperation<K, V, U> implements Operation<K, V, U> {
+  final private K key;
+  final private U val;

Review comment:
       let's rename this as update?

##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       Is it possible to throw the exception when we detect there is a .sendUpdateTo(table) and the config allows compact batching? Right now it's throwing in runtime which might be hard to debug. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r760618384



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture
+        .handle((result, ex) -> {
+          // RecordDoesNotExistException is wrapped in java.util.concurrent.CompletionException
+          return ex != null
+              && ex.getCause() instanceof RecordDoesNotExistException
+              && message.getValue().getDefault() != null;
+        })
+        .thenCompose(shouldPutDefault -> {
+          // If update fails for a given key due to a RecordDoesNotExistException exception thrown and a default is
+          // provided, then attempt to PUT a default record for the key and then apply the update
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
+                sendUpdateToTableOpSpec.getArgs());
+            return putFuture.thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate(),

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r760618254



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture
+        .handle((result, ex) -> {
+          // RecordDoesNotExistException is wrapped in java.util.concurrent.CompletionException
+          return ex != null

Review comment:
       Done. Rethrowing other exceptions wrapped in `SamzaException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761578873



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       This exception is typically caught in the `updateAsync` method  of `AsyncBatchingTable` when batching fails. The code would just delegate it to standard `updateAsync` method upon such an exception.
   ```
    try {
         return batchProcessor.processPutDeleteOrUpdateOperations(new UpdateOperation<>(key, update, args));
       } catch (BatchingNotSupportedException e) {
         return table.updateAsync(key, update, args);
       } catch (Exception e) {
         throw new SamzaException(e);
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r784410391



##########
File path: samza-api/src/main/java/org/apache/samza/table/Table.java
##########
@@ -38,7 +38,7 @@
  * <p>

Review comment:
       I think we should add a section to the table api page to include the partial update feature. Since this pr is already very big, I suggest open another pr for the doc change. What do you think?

##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
+
+  /**
+   * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
+   * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
+   * the event an update fails due to an existing record being absent.
+   * The user also needs to pass UpdateContract argument which defines whether the update is an update only operation
+   * or a update with default.
+   * <p>
+   * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
+   * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
+   * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
+   * scheme when propagated to next operator.
+   *
+   * @param table the table to write update messages to
+   * @param contract Update contract which defines how the update will be performed
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   * @param <U> the type of update value for the table
+   * @return this {@link MessageStream}
+   */
+  <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>> sendTo(Table<KV<K, V>> table, UpdateContract contract);

Review comment:
       Nit-pick: this name is a bit hard to me to understand. maybe UpdateOptions or UpdateArguments?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r784999315



##########
File path: samza-api/src/main/java/org/apache/samza/table/Table.java
##########
@@ -38,7 +38,7 @@
  * <p>

Review comment:
       Added a ticket to track it: https://issues.apache.org/jira/browse/SAMZA-2717




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758656719



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -298,6 +298,25 @@
    */
   <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
 
+  /**
+   * Allows sending messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an update pair- update and an option default.
+   * <p>
+   * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
+   * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
+   * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
+   * scheme when propagated to next operator.
+   *
+   * @param table the table to write messages to
+   * @param args additional arguments passed to the table
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   * @param <U> the type of update value for the table
+   * @return this {@link MessageStream}
+   */
+  <K, V, U> MessageStream<KV<K, UpdatePair<U, V>>> sendUpdateTo(Table<KV<K, UpdatePair<U, V>>> table, Object ... args);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r760422259



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture
+        .handle((result, ex) -> {
+          // RecordDoesNotExistException is wrapped in java.util.concurrent.CompletionException
+          return ex != null
+              && ex.getCause() instanceof RecordDoesNotExistException
+              && message.getValue().getDefault() != null;
+        })
+        .thenCompose(shouldPutDefault -> {
+          // If update fails for a given key due to a RecordDoesNotExistException exception thrown and a default is
+          // provided, then attempt to PUT a default record for the key and then apply the update
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
+                sendUpdateToTableOpSpec.getArgs());
+            return putFuture.thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate(),

Review comment:
       we might want to also ignore the exception during putFuture. You can add something like:
   ```
   putFuture.exceptionally(e -> {
       log exception;
       return null;
   }).thenCompose(....)
   ```
   

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture
+        .handle((result, ex) -> {
+          // RecordDoesNotExistException is wrapped in java.util.concurrent.CompletionException
+          return ex != null

Review comment:
       This might eat the exception. we need to throw if there is an exception but doesn't match this condition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r758879856



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.RecordDoesNotExistException;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordAlreadyExistsException;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate(),
+        sendUpdateToTableOpSpec.getArgs());
+    return updateFuture.whenCompleteAsync((result, ex) -> {

Review comment:
       We need to chain these futures properly, instead of invoking the future inside a function. To chain the futures, we need to use the .thenCompose() method, e.g.:
   
   ```
   updateFuture.handle((r, ex) -> {
       if (ex == null) return true;
       else return false;
   }).thenCompose(success -> {
       if (!success) {
          final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault(),
               sendUpdateToTableOpSpec.getArgs());
         // I think we need to update in any case here
         final CompletableFuture<Void> updateFuture = putFuture.thenCompose(r -> {
              return table.updateAsync(message.getKey(), message.getValue().getUpdate(), sendUpdateToTableOpSpec.getArgs());
         });
          return updateFuture;
       } else {
          return CompletableFuture.completedFuture(null);
       }
   }).thenApply(result -> Collections.singleton(message));
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r762346311



##########
File path: samza-api/src/main/java/org/apache/samza/table/RecordDoesNotExistException.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+/**
+ * Custom exception which can be thrown by implementations of {@link org.apache.samza.table.remote.TableWriteFunction}
+ * when {@link AsyncReadWriteTable#updateAsync(Object, Object, Object...)} fails due an existing record not being
+ * present for the given key. {@link org.apache.samza.operators.MessageStream#sendUpdateTo(Table, Object...)} will
+ * attempt to call {@link AsyncReadWriteTable#putAsync(Object, Object, Object...)} instead to insert a new record if a
+ * default is provided.
+ */
+public class RecordDoesNotExistException extends RuntimeException {

Review comment:
       Is it possible to add a test to test out the use case when this exception happens? It will assure we will do the put first with default value then post again.

##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       Oh, what I meant is that if we don't support update with certain table mode, like here batching with compaction, we should catch it during the pipeline building stage, instead of running stage. So I think we can add a check in SendUpdateToTableOperatorImpl or somewhere when input.sendUpdateToTable() is used (the high-level api). There we validate the compact batching is not turned on. That way the user will see the exception before running the pipeline. 

##########
File path: samza-api/src/main/java/org/apache/samza/table/RecordDoesNotExistException.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+/**
+ * Custom exception which can be thrown by implementations of {@link org.apache.samza.table.remote.TableWriteFunction}
+ * when {@link AsyncReadWriteTable#updateAsync(Object, Object, Object...)} fails due an existing record not being
+ * present for the given key. {@link org.apache.samza.operators.MessageStream#sendUpdateTo(Table, Object...)} will
+ * attempt to call {@link AsyncReadWriteTable#putAsync(Object, Object, Object...)} instead to insert a new record if a
+ * default is provided.
+ */
+public class RecordDoesNotExistException extends RuntimeException {

Review comment:
       Let's name is RecordNotFoundException, seems it's common that way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763277850



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       `RemoteTableDescriptor` has a `withBatchProvider` which can be used to specify a Complete/Compact batch right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r785060632



##########
File path: samza-api/src/main/java/org/apache/samza/table/Table.java
##########
@@ -38,7 +38,7 @@
  * <p>

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763263015



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       Right, that makes sense. It should be possible to do it within the constructor of `SendUpdateToTableOperatorImpl`. The only way to do that is to expose the `BatchProvider` from the `RemoteTable` and check if it is an instance of `CompactBatchProvider`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763265659



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       Hmm, how does user specify to use this compact batching? I think there should be either some api or configs to specify that. Could you double check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r761418272



##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);
+
+  /**
+   * Asynchronously updates the existing records for the given keys with their corresponding updates.
+   *
+   * @param updates the key and update mappings.
+   * @param args additional arguments
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> updates, Object ... args);

Review comment:
       @mynameborat : this patch is getting quite huge so far. Let's have another patch if we want to change all the return type for table methods. Let's keep the scope to be partial update api for this patch. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ajothomas commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r763277850



##########
File path: samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
##########
@@ -68,8 +71,10 @@ public int size() {
 
     if (operation instanceof GetOperation) {
       queries.putIfAbsent(operation.getKey(), operation);
+    } else if (operation instanceof UpdateOperation) {
+      throw new BatchingNotSupportedException("Batching not supported for Updates with CompactBatches");

Review comment:
       `RemoteTableDescriptor` has a `withBatchProvider` setter which can be used to specify a Complete/Compact batch right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r788900407



##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }

Review comment:
       interesting, so we have different paths for the lifecycle of `init` vs `close` is it? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org