You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/10/11 21:54:01 UTC
samza git commit: SAMZA-1948: Updated hybrid table descriptors to
take underlying table descriptors
Repository: samza
Updated Branches:
refs/heads/master 63d33fa06 -> 1eb4c2663
SAMZA-1948: Updated hybrid table descriptors to take underlying table descriptors
Hybrid Table Descriptors, e.g. CachingTableDescriptor, should take both tables as a constructor param instead of using withXYZ methods. This is better because they are required for the hybrid descriptor to work, and it helps with type inference for the descriptor.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Peng Du <pd...@linkedin.com>
Closes #706 from weisong44/SAMZA-1948 and squashes the following commits:
53444419 [Wei Song] Updated based on review comments
39d9ab00 [Wei Song] SAMZA-1948 Updated hybrid table descriptors to take underlying table descriptors
a56c28dc [Wei Song] Merge remote-tracking branch 'upstream/master'
097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master'
05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master'
f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master'
7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master'
f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master'
1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master'
c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master'
242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master'
ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master'
e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master'
8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master'
1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master'
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb4c266
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb4c266
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb4c266
Branch: refs/heads/master
Commit: 1eb4c266339693edf6f13a0da036cc7ca6ff3fc3
Parents: 63d33fa
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Oct 11 14:53:54 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Oct 11 14:53:54 2018 -0700
----------------------------------------------------------------------
.../samza/table/TableDescriptorsProvider.java | 2 +-
.../table/caching/CachingTableDescriptor.java | 47 +++++++++-----------
.../samza/table/caching/TestCachingTable.java | 13 +++---
.../samza/test/table/TestRemoteTable.java | 6 +--
4 files changed, 33 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 5f8d766..7b8754a 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -91,7 +91,7 @@ import org.apache.samza.operators.TableDescriptor;
public interface TableDescriptorsProvider {
/**
* Constructs instances of the table descriptors
- * @param config
+ * @param config the job config
* @return list of table descriptors
*/
List<TableDescriptor> getTableDescriptors(Config config);
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
index 4896e93..f9d4007 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
@@ -47,10 +47,30 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
private boolean isWriteAround;
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance with internal cache
+ *
+ * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+ * @param table target table descriptor
*/
- public CachingTableDescriptor(String tableId) {
+ public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
super(tableId);
+ this.table = table;
+ }
+
+ /**
+ * Constructs a table descriptor instance and specify a cache (as Table descriptor)
+ * to be used for caching. Cache get is not synchronized with put for better parallelism
+ * in the read path of {@link CachingTable}. As such, cache table implementation is
+ * expected to be thread-safe for concurrent accesses.
+ *
+ * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
+ * @param table target table descriptor
+ * @param cache cache table descriptor
+ */
+ public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
+ TableDescriptor<K, V, ?> cache) {
+ this(tableId, table);
+ this.cache = cache;
}
@Override
@@ -88,29 +108,6 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
}
/**
- * Specify a cache (as Table descriptor) to be used for caching.
- * Cache get is not synchronized with put for better parallelism in the read path
- * of {@link CachingTable}. As such, cache table implementation is expected to be
- * thread-safe for concurrent accesses.
- * @param cache cache table descriptor
- * @return this descriptor
- */
- public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) {
- this.cache = cache;
- return this;
- }
-
- /**
- * Specify the target table descriptor for the actual table input/output.
- * @param table the target table descriptor
- * @return this descriptor
- */
- public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) {
- this.table = table;
- return this;
- }
-
- /**
* Specify the TTL for each read access, ie. record is expired after
* the TTL duration since last read access of each key.
* @param readTtl read TTL
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index dc13d00..128b938 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -82,14 +82,15 @@ public class TestCachingTable {
}
private void doTestSerialize(TableDescriptor cache) {
- CachingTableDescriptor desc = new CachingTableDescriptor("1");
- desc.withTable(createDummyTableDescriptor("2"));
+ CachingTableDescriptor desc;
+ TableDescriptor table = createDummyTableDescriptor("2");
if (cache == null) {
+ desc = new CachingTableDescriptor("1", table);
desc.withReadTtl(Duration.ofMinutes(3));
desc.withWriteTtl(Duration.ofMinutes(3));
desc.withCacheSize(1000);
} else {
- desc.withCache(cache);
+ desc = new CachingTableDescriptor("1", table, cache);
}
desc.withWriteAround();
@@ -150,9 +151,9 @@ public class TestCachingTable {
}
private void doTestCacheOps(boolean isWriteAround) {
- CachingTableDescriptor desc = new CachingTableDescriptor("1");
- desc.withTable(createDummyTableDescriptor("realTable"));
- desc.withCache(createDummyTableDescriptor("cacheTable"));
+ CachingTableDescriptor desc = new CachingTableDescriptor("1",
+ createDummyTableDescriptor("realTable"),
+ createDummyTableDescriptor("cacheTable"));
if (isWriteAround) {
desc.withWriteAround();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index a48bb7f..a42f2e6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -144,17 +144,17 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
}
private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
- CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id);
+ CachingTableDescriptor<K, V> cachingDesc;
if (defaultCache) {
+ cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
cachingDesc.withReadTtl(Duration.ofMinutes(5));
cachingDesc.withWriteTtl(Duration.ofMinutes(5));
} else {
GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
- cachingDesc.withCache(guavaTableDesc);
+ cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
}
- cachingDesc.withTable(actualTableDesc);
return appDesc.getTable(cachingDesc);
}