You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/10/11 21:54:01 UTC

samza git commit: SAMZA-1948: Updated hybrid table descriptors to take underlying table descriptors

Repository: samza
Updated Branches:
  refs/heads/master 63d33fa06 -> 1eb4c2663


SAMZA-1948: Updated hybrid table descriptors to take underlying table descriptors

Hybrid Table Descriptors, e.g. CachingTableDescriptor, should take both tables as a constructor param instead of using withXYZ methods. This is better because they are required for the hybrid descriptor to work, and it helps with type inference for the descriptor.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Peng Du <pd...@linkedin.com>

Closes #706 from weisong44/SAMZA-1948 and squashes the following commits:

53444419 [Wei Song] Updated based on review comments
39d9ab00 [Wei Song] SAMZA-1948 Updated hybrid table descriptors to take underlying table descriptors
a56c28dc [Wei Song] Merge remote-tracking branch 'upstream/master'
097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master'
05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master'
f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master'
7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master'
f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master'
1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master'
c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master'
242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master'
ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master'
e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master'
8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master'
1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master'
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb4c266
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb4c266
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb4c266

Branch: refs/heads/master
Commit: 1eb4c266339693edf6f13a0da036cc7ca6ff3fc3
Parents: 63d33fa
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Oct 11 14:53:54 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Oct 11 14:53:54 2018 -0700

----------------------------------------------------------------------
 .../samza/table/TableDescriptorsProvider.java   |  2 +-
 .../table/caching/CachingTableDescriptor.java   | 47 +++++++++-----------
 .../samza/table/caching/TestCachingTable.java   | 13 +++---
 .../samza/test/table/TestRemoteTable.java       |  6 +--
 4 files changed, 33 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 5f8d766..7b8754a 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -91,7 +91,7 @@ import org.apache.samza.operators.TableDescriptor;
 public interface TableDescriptorsProvider {
   /**
    * Constructs instances of the table descriptors
-   * @param config
+   * @param config the job config
    * @return list of table descriptors
    */
   List<TableDescriptor> getTableDescriptors(Config config);

http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
index 4896e93..f9d4007 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
@@ -47,10 +47,30 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
   private boolean isWriteAround;
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance with internal cache
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
    */
-  public CachingTableDescriptor(String tableId) {
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
     super(tableId);
+    this.table = table;
+  }
+
+  /**
+   * Constructs a table descriptor instance and specify a cache (as Table descriptor)
+   * to be used for caching. Cache get is not synchronized with put for better parallelism
+   * in the read path of {@link CachingTable}. As such, cache table implementation is
+   * expected to be thread-safe for concurrent accesses.
+   *
+   * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+   * @param table target table descriptor
+   * @param cache cache table descriptor
+   */
+  public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
+      TableDescriptor<K, V, ?> cache) {
+    this(tableId, table);
+    this.cache = cache;
   }
 
   @Override
@@ -88,29 +108,6 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
   }
 
   /**
-   * Specify a cache (as Table descriptor) to be used for caching.
-   * Cache get is not synchronized with put for better parallelism in the read path
-   * of {@link CachingTable}. As such, cache table implementation is expected to be
-   * thread-safe for concurrent accesses.
-   * @param cache cache table descriptor
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) {
-    this.cache = cache;
-    return this;
-  }
-
-  /**
-   * Specify the target table descriptor for the actual table input/output.
-   * @param table the target table descriptor
-   * @return this descriptor
-   */
-  public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) {
-    this.table = table;
-    return this;
-  }
-
-  /**
    * Specify the TTL for each read access, ie. record is expired after
    * the TTL duration since last read access of each key.
    * @param readTtl read TTL

http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index dc13d00..128b938 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -82,14 +82,15 @@ public class TestCachingTable {
   }
 
   private void doTestSerialize(TableDescriptor cache) {
-    CachingTableDescriptor desc = new CachingTableDescriptor("1");
-    desc.withTable(createDummyTableDescriptor("2"));
+    CachingTableDescriptor desc;
+    TableDescriptor table = createDummyTableDescriptor("2");
     if (cache == null) {
+      desc = new CachingTableDescriptor("1", table);
       desc.withReadTtl(Duration.ofMinutes(3));
       desc.withWriteTtl(Duration.ofMinutes(3));
       desc.withCacheSize(1000);
     } else {
-      desc.withCache(cache);
+      desc = new CachingTableDescriptor("1", table, cache);
     }
 
     desc.withWriteAround();
@@ -150,9 +151,9 @@ public class TestCachingTable {
   }
 
   private void doTestCacheOps(boolean isWriteAround) {
-    CachingTableDescriptor desc = new CachingTableDescriptor("1");
-    desc.withTable(createDummyTableDescriptor("realTable"));
-    desc.withCache(createDummyTableDescriptor("cacheTable"));
+    CachingTableDescriptor desc = new CachingTableDescriptor("1",
+        createDummyTableDescriptor("realTable"),
+        createDummyTableDescriptor("cacheTable"));
     if (isWriteAround) {
       desc.withWriteAround();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index a48bb7f..a42f2e6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -144,17 +144,17 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
   }
 
   private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
-    CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id);
+    CachingTableDescriptor<K, V> cachingDesc;
     if (defaultCache) {
+      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
       cachingDesc.withReadTtl(Duration.ofMinutes(5));
       cachingDesc.withWriteTtl(Duration.ofMinutes(5));
     } else {
       GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
       guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
-      cachingDesc.withCache(guavaTableDesc);
+      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
     }
 
-    cachingDesc.withTable(actualTableDesc);
     return appDesc.getTable(cachingDesc);
   }