You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/10/19 19:32:32 UTC

samza git commit: SAMZA-1964: Make getTableSpec() in RemoteTableDescriptor reentrant

Repository: samza
Updated Branches:
  refs/heads/master 8242fab0f -> 520da15e6


SAMZA-1964: Make getTableSpec() in RemoteTableDescriptor reentrant

As per subject

Author: Wei Song <ws...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #747 from weisong44/SAMZA-1964


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/520da15e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/520da15e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/520da15e

Branch: refs/heads/master
Commit: 520da15e682d55499224dc2be5b7964e7780eebf
Parents: 8242fab
Author: Wei Song <ws...@linkedin.com>
Authored: Fri Oct 19 12:32:26 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Oct 19 12:32:26 2018 -0700

----------------------------------------------------------------------
 .../remote/descriptors/RemoteTableDescriptor.java    | 15 ++++++---------
 .../descriptors/TestRemoteTableDescriptor.java       |  1 +
 .../descriptors/TestInMemoryTableDescriptor.java     |  7 +++++++
 .../kv/descriptors/TestRocksDbTableDescriptor.java   |  6 ++++--
 4 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
index 0187b2e..1d1bca6 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
@@ -65,8 +65,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   // Output support for a specific remote store (optional)
   private TableWriteFunction<K, V> writeFn;
 
-  // Rate limiter for client-side throttling;
-  // can either be constructed indirectly from rates or overridden by withRateLimiter()
+  // Rate limiter for client-side throttling; it is set by withRateLimiter()
   private RateLimiter rateLimiter;
 
   // Rates for constructing the default rate limiter when they are non-zero
@@ -113,21 +112,19 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
       tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
     }
 
-    // Serialize the rate limiter if specified
     if (!tagCreditsMap.isEmpty()) {
-      rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
-    }
-
-    if (rateLimiter != null) {
+      tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter",
+          new EmbeddedTaggedRateLimiter(tagCreditsMap)));
+    } else if (rateLimiter != null) {
       tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
     }
 
-    // Serialize the readCredit and writeCredit functions
+    // Serialize the readCredit functions
     if (readCreditFn != null) {
       tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
           "read credit function", readCreditFn));
     }
-
+    // Serialize the writeCredit functions
     if (writeCreditFn != null) {
       tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
           "write credit function", writeCreditFn));

http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index c6d969e..fb9d2e7 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -197,6 +197,7 @@ public class TestRemoteTableDescriptor {
     }
 
     TableSpec spec = desc.getTableSpec();
+    spec = desc.getTableSpec();
     RemoteTableProvider provider = new RemoteTableProvider(spec);
     provider.init(createMockContext());
     Table table = provider.getTable();

http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
index 33e3c35..5353837 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableDescriptor.java
@@ -29,6 +29,13 @@ import org.junit.Test;
 
 public class TestInMemoryTableDescriptor {
   @Test
+  public void testMinimal() {
+    InMemoryTableDescriptor tableDescriptor = new InMemoryTableDescriptor("1");
+    tableDescriptor.getTableSpec();
+    tableDescriptor.getTableSpec();
+  }
+
+  @Test
   public void testTableSpec() {
 
     TableSpec tableSpec = new InMemoryTableDescriptor("1",

http://git-wip-us.apache.org/repos/asf/samza/blob/520da15e/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 86efea5..86f6e5c 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -30,8 +30,10 @@ public class TestRocksDbTableDescriptor {
 
   @Test
   public void testMinimal() {
-    new RocksDbTableDescriptor<Integer, String>("1")
-        .validate();
+    RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("1");
+    tableDescriptor.validate();
+    tableDescriptor.getTableSpec();
+    tableDescriptor.getTableSpec();
   }
 
   @Test