You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/01/18 19:12:07 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

mynameborat commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r786914705



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);

Review comment:
       Why do you have to modify this signature? Is it colliding with the newer signature w/ `UpdateOptions` is it?
   What are the options for exiting API users that pass in var args? I don't see a path forward for them unless using the new update API even if their flow doesn't need update.
   

##########
File path: samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,60 @@ default void putAll(List<Entry<K, V>> records) {
     throw new SamzaException("Not supported");
   }
 
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @return CompletableFuture for the update request
+   */
+  CompletableFuture<Void> updateAsync(K key, U update);
+
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * If the update operation failed due to the an existing record missing for the key, the implementation can return
+   * a future completed exceptionally with a {@link RecordNotFoundException} which will
+   * allow to Put a default value if one is provided.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @param args additional arguments
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAsync(K key, U update, Object ... args) {
+    throw new SamzaException("Not supported");
+  }
+
+  /**
+   * Asynchronously updates the table with {@code records} with specified {@code keys}. This method must be thread-safe.
+   * The default implementation calls updateAsync for each entry and return a combined future.
+   * @param records updates for the table
+   * @return CompletableFuture for the update request
+   */
+  default CompletableFuture<Void> updateAllAsync(Collection<Entry<K, U>> records) {
+    final List<CompletableFuture<Void>> updateFutures = records.stream()
+        .map(entry -> updateAsync(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+    return CompletableFuture.allOf(Iterables.toArray(updateFutures, CompletableFuture.class));

Review comment:
       What is the failure handling strategy here? Just bubble up the exception is it? 

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +91,27 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates an existing record for a given key with the specified update.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, Object ... args);

Review comment:
       why are we not passing the `UpdateOptions` here? I understand initially you had this signature to be in parity w/ the other APIs. However, the whole idea of introducing `UpdateOptions` was to make it strongly typed and hence pass it down.

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
##########
@@ -54,7 +56,12 @@ protected void handleInit(Context context) {
   @Override
   protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    return table.putAsync(message.getKey(), message.getValue(), sendToTableOpSpec.getArgs())
+    if (message.getValue() instanceof UpdateMessage) {
+      throw new SamzaException("Incorrect use of .sendTo operator with UpdateMessage value type. "
+          + "Please use the following method on MessageStream- sendTo(Table<KV<K, UpdateMessage<U, V>>> table,"
+          + "UpdateOptions updateOptions).");

Review comment:
       Can we perform this check earlier instead by not allowing type `V` of `UpdateMessage` during construction instead of handleMessageAsync?

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }

Review comment:
       don't we need any `table.init()` or some other initialization. I noticed we have `table.close()` below

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdateOptions;
+import org.apache.samza.operators.UpdateMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.RecordNotFoundException;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of a send-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it might attempt to write a
+ * default record and then applies an update.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendToTableWithUpdateOperatorImpl<K, V, U>
+    extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
+
+  private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> spec, Context context) {
+    this.spec = spec;
+    this.table = context.getTaskContext().getTable(spec.getTableId());
+    if (context.getTaskContext().getTable(spec.getTableId()) instanceof RemoteTable) {
+      RemoteTable<K, V, U> remoteTable = (RemoteTable<K, V, U>) table;
+      if (remoteTable.getBatchProvider() instanceof CompactBatchProvider) {
+        throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
+      }
+    }
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    final UpdateOptions options = spec.getUpdateOptions();
+    final CompletableFuture<Void> updateFuture = table.updateAsync(message.getKey(), message.getValue().getUpdate());
+
+    return updateFuture
+        .handle((result, ex) -> {
+          if (ex == null) {
+            // success, no need to Put a default value
+            return false;
+          } else if (ex.getCause() instanceof RecordNotFoundException && message.getValue().hasDefault()) {
+            // If an update fails for a given key due to a RecordDoesNotExistException exception thrown and
+            // a default is provided and the UpdateOptions is set to UPDATE_WITH_DEFAULTS, then attempt
+            // to PUT a default record for the key and then apply the update.
+            if (options == UpdateOptions.UPDATE_WITH_DEFAULTS) {
+              return true;
+            } else {
+              throw new SamzaException("Put default failed for update as the Update options was set to " + options +
+                  ". Please use UpdateOptions.UPDATE_WITH_DEFAULTS instead.");
+            }
+          } else {
+            throw new SamzaException("Update failed with exception: ", ex);
+          }
+        })
+        .thenCompose(shouldPutDefault -> {
+          if (shouldPutDefault) {
+            final CompletableFuture<Void> putFuture = table.putAsync(message.getKey(), message.getValue().getDefault());
+            return putFuture
+                .exceptionally(ex -> {
+                  LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. "
+                          + "The exception encountered is: ", ex);
+                  return null;
+                })
+                .thenCompose(res -> table.updateAsync(message.getKey(), message.getValue().getUpdate()));

Review comment:
       what happens if this fails? 

##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -291,12 +291,34 @@
    * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
-   * @param args additional arguments passed to the table
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
    * @return this {@link MessageStream}
    */
-  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
+
+  /**
+   * Allows sending update messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an UpdateMessage- update and default value.
+   * Defaults are optional and can be used if the Remote Table integration supports inserting a default through PUT in
+   * the event an update fails due to an existing record being absent.
+   * The user also needs to pass UpdateOptions argument which defines whether the update is an update only operation

Review comment:
       s/`UpdateMessage`/`{@link UpdateMessage}` and move some of the texts on the semantics of default into the `UpdateMessage` java doc.
   
   Similarly link `UpdateOptions` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org