You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/01/21 00:31:00 UTC

[samza] branch master updated: SAMZA-2718: TaskContext.getTable should not upcast ReadWriteUpdateTable to ReadWriteTable (#1577)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new aa31bb6  SAMZA-2718: TaskContext.getTable should not upcast ReadWriteUpdateTable to ReadWriteTable (#1577)
aa31bb6 is described below

commit aa31bb6ecedc4790b3607ddce0f3ef10737bda90
Author: ajo thomas <aj...@users.noreply.github.com>
AuthorDate: Thu Jan 20 16:29:14 2022 -0800

    SAMZA-2718: TaskContext.getTable should not upcast ReadWriteUpdateTable to ReadWriteTable (#1577)
---
 .../org/apache/samza/context/TaskContextImpl.java  |   3 +-
 .../apache/samza/table/ReadWriteTableDelegate.java | 120 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index 6a0510e..7543d8d 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.ReadWriteTableDelegate;
 import org.apache.samza.table.ReadWriteUpdateTable;
 import org.apache.samza.table.TableManager;
 
@@ -101,7 +102,7 @@ public class TaskContextImpl implements TaskContext {
   @Override
   public <K, V> ReadWriteTable<K, V> getTable(String tableId) {
     final ReadWriteUpdateTable table = this.tableManager.getTable(tableId);
-    return (ReadWriteTable) table;
+    return new ReadWriteTableDelegate(table);
   }
 
   @Override
diff --git a/samza-core/src/main/java/org/apache/samza/table/ReadWriteTableDelegate.java b/samza-core/src/main/java/org/apache/samza/table/ReadWriteTableDelegate.java
new file mode 100644
index 0000000..cbc9334
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/ReadWriteTableDelegate.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+
+/**
+ * This class delegates all it's method calls to the underlying ReadWriteUpdateTable
+ * except for update methods.
+ * */
+public class ReadWriteTableDelegate<K, V> implements ReadWriteTable<K, V> {
+  private final ReadWriteUpdateTable<K, V, Void> updateTable;
+
+  /**
+   * @param updateTable input table.
+   * */
+  public ReadWriteTableDelegate(ReadWriteUpdateTable<K, V, Void> updateTable) {
+    this.updateTable = updateTable;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key, Object... args) {
+    return updateTable.getAsync(key, args);
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object... args) {
+    return updateTable.getAllAsync(keys, args);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value, Object... args) {
+    return updateTable.putAsync(key, value, args);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object... args) {
+    return updateTable.putAllAsync(entries, args);
+  }
+
+  @Override
+  public CompletableFuture<Void> updateAsync(K key, Void update) {
+    throw new SamzaException("Not supported");
+  }
+
+  @Override
+  public CompletableFuture<Void> updateAllAsync(List<Entry<K, Void>> updates) {
+    throw new SamzaException("Not supported");
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key, Object... args) {
+    return updateTable.deleteAsync(key, args);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object... args) {
+    return updateTable.deleteAllAsync(keys, args);
+  }
+
+  @Override
+  public void flush() {
+    updateTable.flush();
+  }
+
+  @Override
+  public void close() {
+    updateTable.close();
+  }
+
+  @Override
+  public V get(K key, Object... args) {
+    return updateTable.get(key, args);
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys, Object... args) {
+    return updateTable.getAll(keys, args);
+  }
+
+  @Override
+  public void put(K key, V value, Object... args) {
+    updateTable.put(key, value, args);
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries, Object... args) {
+    updateTable.putAll(entries, args);
+  }
+
+  @Override
+  public void delete(K key, Object... args) {
+    updateTable.delete(key, args);
+  }
+
+  @Override
+  public void deleteAll(List<K> keys, Object... args) {
+    updateTable.deleteAll(keys, args);
+  }
+}