You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:08 UTC

[09/29] samza git commit: SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed

SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed

Changed table descriptor to take serde in constructor, and removed withSerde() from all table descriptors.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #649 from weisong44/SAMZA-1849 and squashes the following commits:

a3ba2f70 [Wei Song] Merge branch 'master' into SAMZA-1849
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
e7a716c0 [Wei Song] Updated based on review comments
0601566f [Wei Song] SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d8939123
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d8939123
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d8939123

Branch: refs/heads/NewKafkaSystemConsumer
Commit: d89391231144b1700895673448a13d83b1c92a3a
Parents: c48bcd2
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Sep 20 10:17:49 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Sep 20 10:17:49 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/TableDescriptor.java  | 13 ++-----------
 .../samza/table/TableDescriptorsProvider.java    |  8 +++-----
 .../samza/operators/BaseTableDescriptor.java     | 19 ++++++++++---------
 .../table/remote/RemoteTableDescriptor.java      | 13 ++++++++++++-
 .../kv/inmemory/InMemoryTableDescriptor.java     | 13 ++++++++++++-
 .../kv/inmemory/TestInMemoryTableDescriptor.java |  4 ++--
 .../samza/storage/kv/RocksDbTableDescriptor.java | 13 ++++++++++++-
 .../storage/kv/TestRocksDbTableDescriptor.java   | 11 +++++------
 .../kv/BaseLocalStoreBackedTableDescriptor.java  | 13 +++++++++++--
 .../sql/impl/ConfigBasedIOResolverFactory.java   |  7 +++----
 .../sql/testutil/TestIOResolverFactory.java      |  7 +++----
 .../apache/samza/test/table/TestLocalTable.java  |  9 ++++-----
 .../test/table/TestLocalTableWithSideInputs.java |  6 ++----
 .../test/table/TestTableDescriptorsProvider.java | 10 ++++------
 14 files changed, 85 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
index a60b6a9..dbcd65e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
@@ -19,7 +19,6 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.serializers.KVSerde;
 
 /**
  * User facing class to collect metadata that fully describes a
@@ -30,8 +29,8 @@ import org.apache.samza.serializers.KVSerde;
  *
  * <pre>
  * {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl")
- *     .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ *         KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
  *     .withBlockSize(1024)
  *     .withConfig("some-key", "some-value");
  * }
@@ -55,14 +54,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
   String getTableId();
 
   /**
-   * Set the Serde for this table
-   * @param serde the serde
-   * @return this table descriptor instance
-   * @throws IllegalArgumentException if null is provided
-   */
-  D withSerde(KVSerde<K, V> serde);
-
-  /**
    * Add a configuration entry for the table
    * @param key the key
    * @param value the value

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 766a4b4..5f8d766 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -44,13 +44,11 @@ import org.apache.samza.operators.TableDescriptor;
  *   public List<TableDescriptor> getTableDescriptors() {
  *     List<TableDescriptor> tableDescriptors = new ArrayList<>();
  *     final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
- *     tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
- *       .withReadFunction(readRemoteTableFn)
- *       .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ *     tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
+ *       .withReadFunction(readRemoteTableFn);
  *
- *     tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+ *     tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>()))
  *       .withBlockSize(4096)
- *       .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>())));
  *       .withConfig("some-key", "some-value");
  *     return tableDescriptors;
  *   }

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
index f81f3b8..1e4194a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
@@ -51,18 +51,19 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
     this.tableId = tableId;
   }
 
-  @Override
-  public D withConfig(String key, String value) {
-    config.put(key, value);
-    return (D) this;
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+   * @param serde the serde for key and value
+   */
+  protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    this.tableId = tableId;
+    this.serde = serde;
   }
 
   @Override
-  public D withSerde(KVSerde<K, V> serde) {
-    if (serde == null) {
-      throw new IllegalArgumentException("Serde cannot be null");
-    }
-    this.serde = serde;
+  public D withConfig(String key, String value) {
+    config.put(key, value);
     return (D) this;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
index 537ff87..c31348f 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -79,12 +80,22 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   private int asyncCallbackPoolSize = -1;
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
    */
   public RemoteTableDescriptor(String tableId) {
     super(tableId);
   }
 
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+   * @param serde the serde for key and value
+   */
+  public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
   @Override
   public TableSpec getTableSpec() {
     validate();

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
index 8328417..d364234 100644
--- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv.inmemory;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor;
 import org.apache.samza.table.TableSpec;
 
@@ -34,12 +35,22 @@ import org.apache.samza.table.TableSpec;
 public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> {
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
    */
   public InMemoryTableDescriptor(String tableId) {
     super(tableId);
   }
 
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+   * @param serde the serde for key and value
+   */
+  public InMemoryTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
   @Override
   protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
     super.generateTableSpecConfig(tableSpecConfig);

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index 840fb70..89bd058 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -31,8 +31,8 @@ public class TestInMemoryTableDescriptor {
   @Test
   public void testTableSpec() {
 
-    TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1")
-        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+    TableSpec tableSpec = new InMemoryTableDescriptor("1",
+            KVSerde.of(new IntegerSerde(), new StringSerde()))
         .withConfig("inmemory.abc", "xyz")
         .getTableSpec();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
index 9b81605..325d023 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.table.TableSpec;
 
 
@@ -57,13 +58,23 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr
   private String compactionStyle;
 
   /**
-   * {@inheritDoc}
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
    */
   public RocksDbTableDescriptor(String tableId) {
     super(tableId);
   }
 
   /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+   * @param serde the serde for key and value
+   */
+  public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  /**
    * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
    * @param writeBatchSize write batch size
    * @return this table descriptor instance

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
index 50f0920..35a66e8 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
@@ -37,8 +37,8 @@ public class TestRocksDbTableDescriptor {
 
   @Test
   public void testSerde() {
-    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
-        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+    TableSpec tableSpec = new RocksDbTableDescriptor("1",
+            KVSerde.of(new IntegerSerde(), new StringSerde()))
         .getTableSpec();
     Assert.assertNotNull(tableSpec.getSerde());
     Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
@@ -48,8 +48,8 @@ public class TestRocksDbTableDescriptor {
   @Test
   public void testTableSpec() {
 
-    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
-        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
+            KVSerde.of(new IntegerSerde(), new StringSerde()))
         .withBlockSize(1)
         .withCacheSize(2)
         .withCompactionStyle("fifo")
@@ -85,8 +85,7 @@ public class TestRocksDbTableDescriptor {
   @Test
   public void testTableSpecWithChangelogEnabled() {
 
-    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
-        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+    TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
         .withChangelogStream("changelog-$tream")
         .withChangelogReplicationFactor(10)
         .getTableSpec();

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
index c46f9e1..96057d6 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.storage.SideInputsProcessor;
 
 
@@ -49,13 +50,21 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
 
   /**
    * Constructs a table descriptor instance
-   *
-   * @param tableId Id of the table
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
    */
   public BaseLocalStoreBackedTableDescriptor(String tableId) {
     super(tableId);
   }
 
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+   * @param serde the serde for key and value
+   */
+  public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
   public D withSideInputs(List<String> sideInputs) {
     this.sideInputs = sideInputs;
     // Disable changelog

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 1ada813..a1c1bdd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -100,10 +100,9 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
 
       TableDescriptor tableDescriptor = null;
       if (isTable) {
-        tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
-            .withSerde(KVSerde.of(
-                new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-                new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+        tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of(
+            new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+            new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
       }
 
       return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index bd61afd..8318e8a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -202,10 +202,9 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
             tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
           } else {
             String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-");
-            tableDescriptor = new RocksDbTableDescriptor(tableId)
-                .withSerde(KVSerde.of(
-                    new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-                    new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
+                new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+                new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
           }
           tableDescMap.put(ioName, tableDescriptor);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index fbf0539..e1386c8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -96,8 +96,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
 
     final StreamApplication app = appDesc -> {
 
-      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1")
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
       DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
       GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
 
@@ -134,7 +134,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
       final StreamApplication app = appDesc -> {
 
         Table<KV<Integer, Profile>> table = appDesc.getTable(
-            new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+            new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
         DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
         GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
         appDesc.getInputStream(profileISD)
@@ -209,8 +209,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
 
       final StreamApplication app = appDesc -> {
 
-        Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1")
-            .withSerde(profileKVSerde));
+        Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
 
         DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
         GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index adcea48..3c22818 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -143,8 +143,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new InMemoryTableDescriptor<Integer, Profile>(PROFILE_TABLE)
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
               Profile profile = (Profile) msg.getMessage();
@@ -158,8 +157,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
   static class DurablePageViewProfileJoin extends PageViewProfileJoin {
     @Override
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new RocksDbTableDescriptor<Integer, Profile>(PROFILE_TABLE)
-          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
               TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();

http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 34ffbd4..d123cee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -137,13 +137,11 @@ public class TestTableDescriptorsProvider {
       final RateLimiter readRateLimiter = mock(RateLimiter.class);
       final MyReadFunction readFn = new MyReadFunction();
 
-      tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+      tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new LongSerde()))
           .withReadFunction(readFn)
-          .withRateLimiter(readRateLimiter, null, null)
-          .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
-      tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
-          .withBlockSize(4096)
-          .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+          .withRateLimiter(readRateLimiter, null, null));
+      tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
+          .withBlockSize(4096));
       return tableDescriptors;
     }
   }