You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/16 18:31:11 UTC

[GitHub] [samza] weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284841742
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
 ##########
 @@ -241,64 +249,76 @@ public void putAll(List<Entry<K, V>> entries) {
 
     CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
         ? CompletableFuture.completedFuture(null)
-        : deleteAllAsync(deleteKeys);
+        : deleteAllAsync(deleteKeys, args);
 
     // Return the combined future
     return CompletableFuture.allOf(
         deleteFuture,
-        instrument(() -> asyncTable.putAllAsync(putRecords), metrics.numPutAlls, metrics.putAllNs))
+        instrument(() -> asyncTable.putAllAsync(putRecords, args), metrics.numPutAlls, metrics.putAllNs))
         .exceptionally(e -> {
             String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
             throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
           });
   }
 
   @Override
-  public void delete(K key) {
+  public void delete(K key, Object ... args) {
     try {
-      deleteAsync(key).get();
+      deleteAsync(key, args).get();
     } catch (Exception e) {
       throw new SamzaException(e);
     }
   }
 
   @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
+  public CompletableFuture<Void> deleteAsync(K key, Object ... args) {
     Preconditions.checkNotNull(writeFn, "null write function");
     Preconditions.checkNotNull(key, "null key");
-    return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, metrics.deleteNs)
+    return instrument(() -> asyncTable.deleteAsync(key, args), metrics.numDeletes, metrics.deleteNs)
         .exceptionally(e -> {
             throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
           });
   }
 
   @Override
-  public void deleteAll(List<K> keys) {
+  public void deleteAll(List<K> keys, Object ... args) {
     try {
-      deleteAllAsync(keys).get();
+      deleteAllAsync(keys, args).get();
     } catch (Exception e) {
       throw new SamzaException(e);
     }
   }
 
   @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object ... args) {
     Preconditions.checkNotNull(writeFn, "null write function");
     Preconditions.checkNotNull(keys, "null keys");
     if (keys.isEmpty()) {
       return CompletableFuture.completedFuture(null);
     }
 
-    return instrument(() -> asyncTable.deleteAllAsync(keys), metrics.numDeleteAlls, metrics.deleteAllNs)
+    return instrument(() -> asyncTable.deleteAllAsync(keys, args), metrics.numDeleteAlls, metrics.deleteAllNs)
         .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
+            throw new SamzaException(String.format("Failed to delete records for " + keys), e);
+          });
+  }
+
+  @Override
+  public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
+    return (CompletableFuture<T>) instrument(() -> asyncTable.writeAsync(opId, args), metrics.numWrites, metrics.writeNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to write, opId=%d", opId), e);
           });
   }
 
   @Override
   public void init(Context context) {
     super.init(context);
     asyncTable.init(context);
+    readFn.init(context, this);
 
 Review comment:
   Since I need to pass this here, so that table functions get a reference of the remote table to get rate limiter, etc. hooked in during invocation. I removed the init part from AsyncRemoteTable and added a comment to indicate the move.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services