You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/10/19 19:32:32 UTC
samza git commit: SAMZA-1964: Make getTableSpec() in
RemoteTableDescriptor reentrant
Repository: samza
Updated Branches:
refs/heads/master 8242fab0f -> 520da15e6
SAMZA-1964: Make getTableSpec() in RemoteTableDescriptor reentrant
As per subject
Author: Wei Song <ws...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #747 from weisong44/SAMZA-1964
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/520da15e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/520da15e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/520da15e
Branch: refs/heads/master
Commit: 520da15e682d55499224dc2be5b7964e7780eebf
Parents: 8242fab
Author: Wei Song <ws...@linkedin.com>
Authored: Fri Oct 19 12:32:26 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Oct 19 12:32:26 2018 -0700
----------------------------------------------------------------------
.../remote/descriptors/RemoteTableDescriptor.java | 15 ++++++---------
.../descriptors/TestRemoteTableDescriptor.java | 1 +
.../descriptors/TestInMemoryTableDescriptor.java | 7 +++++++
.../kv/descriptors/TestRocksDbTableDescriptor.java | 6 ++++--
4 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
index 0187b2e..1d1bca6 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
@@ -65,8 +65,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
// Output support for a specific remote store (optional)
private TableWriteFunction<K, V> writeFn;
- // Rate limiter for client-side throttling;
- // can either be constructed indirectly from rates or overridden by withRateLimiter()
+ // Rate limiter for client-side throttling; it is set by withRateLimiter()
private RateLimiter rateLimiter;
// Rates for constructing the default rate limiter when they are non-zero
@@ -113,21 +112,19 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
}
- // Serialize the rate limiter if specified
if (!tagCreditsMap.isEmpty()) {
- rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
- }
-
- if (rateLimiter != null) {
+ tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter",
+ new EmbeddedTaggedRateLimiter(tagCreditsMap)));
+ } else if (rateLimiter != null) {
tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
}
- // Serialize the readCredit and writeCredit functions
+ // Serialize the readCredit functions
if (readCreditFn != null) {
tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
"read credit function", readCreditFn));
}
-
+ // Serialize the writeCredit functions
if (writeCreditFn != null) {
tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
"write credit function", writeCreditFn));
http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index c6d969e..fb9d2e7 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -197,6 +197,7 @@ public class TestRemoteTableDescriptor {
}
TableSpec spec = desc.getTableSpec();
+ spec = desc.getTableSpec();
RemoteTableProvider provider = new RemoteTableProvider(spec);
provider.init(createMockContext());
Table table = provider.getTable();
http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
index 33e3c35..5353837 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
@@ -29,6 +29,13 @@ import org.junit.Test;
public class TestInMemoryTableDescriptor {
@Test
+ public void testMinimal() {
+ InMemoryTableDescriptor tableDescriptor = new InMemoryTableDescriptor("1");
+ tableDescriptor.getTableSpec();
+ tableDescriptor.getTableSpec();
+ }
+
+ @Test
public void testTableSpec() {
TableSpec tableSpec = new InMemoryTableDescriptor("1",
http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 86efea5..86f6e5c 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -30,8 +30,10 @@ public class TestRocksDbTableDescriptor {
@Test
public void testMinimal() {
- new RocksDbTableDescriptor<Integer, String>("1")
- .validate();
+ RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("1");
+ tableDescriptor.validate();
+ tableDescriptor.getTableSpec();
+ tableDescriptor.getTableSpec();
}
@Test