You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/11/18 20:05:45 UTC

[GitHub] [samza] xinyuiscool commented on a change in pull request #1560: SAMZA-2709: Adding partial update api to Table API

xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r752535518



##########
File path: samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
##########
@@ -298,6 +298,25 @@
    */
   <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
 
+  /**
+   * Allows sending messages in this {@link MessageStream} to a {@link Table} and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown. The value is an update pair- update and an option default.
+   * <p>
+   * Note: The update will be written but may not be flushed to the underlying table before its propagated to the
+   * chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
+   * it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
+   * scheme when propagated to next operator.
+   *
+   * @param table the table to write messages to
+   * @param args additional arguments passed to the table
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   * @param <U> the type of update value for the table
+   * @return this {@link MessageStream}
+   */
+  <K, V, U> MessageStream<KV<K, UpdatePair<U, V>>> sendUpdateTo(Table<KV<K, UpdatePair<U, V>>> table, Object ... args);

Review comment:
       table should be Table<KV<K, V>> type I think.  The value of the table shouldn't be UpdatePair. If you are concerned about type of input messageStream, it's not bound to the table anyway, e.g. I can write something like MessageStream<Integer>.sendTo(Table<KV<Integer, String>).

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +92,33 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates the record associated with the specified update for a given key.
+   * If the update is attempted on a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param defaultValue the default value that is inserted in the table if the update is applied to a non-existent record.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V defaultValue, Object ... args);

Review comment:
       As discussed offline, It's better to leave the default to put. We can throw some kind of RecordNotExistException here if post fails and throw RecordExistException if put fails. I think this will simplify the logic quite a bit. Please think about whether this can cover all the cases.

##########
File path: samza-api/src/main/java/org/apache/samza/operators/UpdatePair.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+
+/**
+ * A UpdatePair representing the update and an optional default record to be inserted for a key,
+ * if the update is applied to a non-existent record.
+ *
+ * @param <U> type of the update record
+ * @param <V> type of the default record
+ */
+public final class UpdatePair<U, V> {

Review comment:
       Just to be consistent with our MessageStream api, maybe name it as UpdateMessage?

##########
File path: samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,66 @@ default void putAll(List<Entry<K, V>> records) {
     throw new SamzaException("Not supported");
   }
 
+  /**
+   * Asynchronously update the record with specified {@code key} and additional arguments.
+   * This method must be thread-safe.
+   *
+   * @param key key for the table record
+   * @param update update record for the given key
+   * @param record default record to be written if the update is applied to a non-existent record
+   * @return CompletableFuture for the update request
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V record);

Review comment:
       shall we make it default and throw not implmented exception?
   
   Remove default too if possible.

##########
File path: samza-core/src/main/java/org/apache/samza/operators/impl/SendUpdateToTableOperatorImpl.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.UpdatePair;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendUpdateToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Implementation of a send-update-stream-to-table operator that applies updates to existing records
+ * in the table. If there is no pre-existing record, based on Table's implementation it attempts to write a a new record.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ * @param <U> the type of the update applied to this table
+ */
+public class SendUpdateToTableOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdatePair<U, V>>, KV<K, UpdatePair<U, V>>> {
+  private final SendUpdateToTableOperatorSpec<K, V, U> sendUpdateToTableOpSpec;
+  private final ReadWriteTable<K, V, U> table;
+
+  public SendUpdateToTableOperatorImpl(SendUpdateToTableOperatorSpec<K, V, U>  sendUpdateToTableOpSpec, Context context) {
+    this.sendUpdateToTableOpSpec = sendUpdateToTableOpSpec;
+    this.table = context.getTaskContext().getTable(sendUpdateToTableOpSpec.getTableId());
+  }
+
+  @Override
+  protected void handleInit(Context context) {
+  }
+
+  @Override
+  protected CompletionStage<Collection<KV<K, UpdatePair<U, V>>>> handleMessageAsync(KV<K, UpdatePair<U, V>> message,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    return table.updateAsync(message.getKey(), message.getValue().getUpdate(), message.getValue().getDefault(),

Review comment:
       As discussed, here we can add the logic to post default and catch exception if needed.

##########
File path: samza-api/src/main/java/org/apache/samza/table/AsyncReadWriteTable.java
##########
@@ -90,6 +92,33 @@
    */
   CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object ... args);
 
+  /**
+   * Asynchronously updates the record associated with the specified update for a given key.
+   * If the update is attempted on a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param update the update applied to the record associated with a given {@code key}.
+   * @param defaultValue the default value that is inserted in the table if the update is applied to a non-existent record.
+   * @param args additional arguments
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAsync(K key, U update, @Nullable V defaultValue, Object ... args);
+
+  /**
+   * Asynchronously updates the records associated with the specified keys with the specified updates.
+   * If the update is applied to a non-existent record, a default value can be inserted in the table depending
+   * on the table's implementation.
+   *
+   * @param updates the key and update mappings.
+   * @param defaults the key and default value mappings.
+   * @param args additional arguments
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
+   * @return CompletableFuture for the operation
+   */
+  CompletableFuture<Void> updateAllAsync(List<Entry<K, U>> updates, @Nullable List<Entry<K, V>> defaults, Object ... args);

Review comment:
       Same above. Leave the default to put.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org