You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:08 UTC
[09/29] samza git commit: SAMZA-1849: Table Descriptors should take
Serde at construction time so that descriptors can be typed
SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed
Changed table descriptor to take serde in constructor, and removed withSerde() from all table descriptors.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #649 from weisong44/SAMZA-1849 and squashes the following commits:
a3ba2f70 [Wei Song] Merge branch 'master' into SAMZA-1849
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
e7a716c0 [Wei Song] Updated based on review comments
0601566f [Wei Song] SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d8939123
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d8939123
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d8939123
Branch: refs/heads/NewKafkaSystemConsumer
Commit: d89391231144b1700895673448a13d83b1c92a3a
Parents: c48bcd2
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Sep 20 10:17:49 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Sep 20 10:17:49 2018 -0700
----------------------------------------------------------------------
.../apache/samza/operators/TableDescriptor.java | 13 ++-----------
.../samza/table/TableDescriptorsProvider.java | 8 +++-----
.../samza/operators/BaseTableDescriptor.java | 19 ++++++++++---------
.../table/remote/RemoteTableDescriptor.java | 13 ++++++++++++-
.../kv/inmemory/InMemoryTableDescriptor.java | 13 ++++++++++++-
.../kv/inmemory/TestInMemoryTableDescriptor.java | 4 ++--
.../samza/storage/kv/RocksDbTableDescriptor.java | 13 ++++++++++++-
.../storage/kv/TestRocksDbTableDescriptor.java | 11 +++++------
.../kv/BaseLocalStoreBackedTableDescriptor.java | 13 +++++++++++--
.../sql/impl/ConfigBasedIOResolverFactory.java | 7 +++----
.../sql/testutil/TestIOResolverFactory.java | 7 +++----
.../apache/samza/test/table/TestLocalTable.java | 9 ++++-----
.../test/table/TestLocalTableWithSideInputs.java | 6 ++----
.../test/table/TestTableDescriptorsProvider.java | 10 ++++------
14 files changed, 85 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
index a60b6a9..dbcd65e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.serializers.KVSerde;
/**
* User facing class to collect metadata that fully describes a
@@ -30,8 +29,8 @@ import org.apache.samza.serializers.KVSerde;
*
* <pre>
* {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl")
- * .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ * KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
* .withBlockSize(1024)
* .withConfig("some-key", "some-value");
* }
@@ -55,14 +54,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
String getTableId();
/**
- * Set the Serde for this table
- * @param serde the serde
- * @return this table descriptor instance
- * @throws IllegalArgumentException if null is provided
- */
- D withSerde(KVSerde<K, V> serde);
-
- /**
* Add a configuration entry for the table
* @param key the key
* @param value the value
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 766a4b4..5f8d766 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -44,13 +44,11 @@ import org.apache.samza.operators.TableDescriptor;
* public List<TableDescriptor> getTableDescriptors() {
* List<TableDescriptor> tableDescriptors = new ArrayList<>();
* final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
- * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
- * .withReadFunction(readRemoteTableFn)
- * .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
+ * .withReadFunction(readRemoteTableFn);
*
- * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+ * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>()))
* .withBlockSize(4096)
- * .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>())));
* .withConfig("some-key", "some-value");
* return tableDescriptors;
* }
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
index f81f3b8..1e4194a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
@@ -51,18 +51,19 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
this.tableId = tableId;
}
- @Override
- public D withConfig(String key, String value) {
- config.put(key, value);
- return (D) this;
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+ * @param serde the serde for key and value
+ */
+ protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ this.tableId = tableId;
+ this.serde = serde;
}
@Override
- public D withSerde(KVSerde<K, V> serde) {
- if (serde == null) {
- throw new IllegalArgumentException("Serde cannot be null");
- }
- this.serde = serde;
+ public D withConfig(String key, String value) {
+ config.put(key, value);
return (D) this;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
index 537ff87..c31348f 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.SerdeUtils;
@@ -79,12 +80,22 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
private int asyncCallbackPoolSize = -1;
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
*/
public RemoteTableDescriptor(String tableId) {
super(tableId);
}
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+ * @param serde the serde for key and value
+ */
+ public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
@Override
public TableSpec getTableSpec() {
validate();
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
index 8328417..d364234 100644
--- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv.inmemory;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor;
import org.apache.samza.table.TableSpec;
@@ -34,12 +35,22 @@ import org.apache.samza.table.TableSpec;
public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> {
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
*/
public InMemoryTableDescriptor(String tableId) {
super(tableId);
}
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+ * @param serde the serde for key and value
+ */
+ public InMemoryTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
@Override
protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
super.generateTableSpecConfig(tableSpecConfig);
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index 840fb70..89bd058 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -31,8 +31,8 @@ public class TestInMemoryTableDescriptor {
@Test
public void testTableSpec() {
- TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1")
- .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+ TableSpec tableSpec = new InMemoryTableDescriptor("1",
+ KVSerde.of(new IntegerSerde(), new StringSerde()))
.withConfig("inmemory.abc", "xyz")
.getTableSpec();
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
index 9b81605..325d023 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.table.TableSpec;
@@ -57,13 +58,23 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr
private String compactionStyle;
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
*/
public RocksDbTableDescriptor(String tableId) {
super(tableId);
}
/**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+ * @param serde the serde for key and value
+ */
+ public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ /**
* Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
* @param writeBatchSize write batch size
* @return this table descriptor instance
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
index 50f0920..35a66e8 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
@@ -37,8 +37,8 @@ public class TestRocksDbTableDescriptor {
@Test
public void testSerde() {
- TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
- .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+ TableSpec tableSpec = new RocksDbTableDescriptor("1",
+ KVSerde.of(new IntegerSerde(), new StringSerde()))
.getTableSpec();
Assert.assertNotNull(tableSpec.getSerde());
Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
@@ -48,8 +48,8 @@ public class TestRocksDbTableDescriptor {
@Test
public void testTableSpec() {
- TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
- .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+ TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
+ KVSerde.of(new IntegerSerde(), new StringSerde()))
.withBlockSize(1)
.withCacheSize(2)
.withCompactionStyle("fifo")
@@ -85,8 +85,7 @@ public class TestRocksDbTableDescriptor {
@Test
public void testTableSpecWithChangelogEnabled() {
- TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
- .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+ TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
.withChangelogStream("changelog-$tream")
.withChangelogReplicationFactor(10)
.getTableSpec();
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
index c46f9e1..96057d6 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.storage.SideInputsProcessor;
@@ -49,13 +50,21 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
/**
* Constructs a table descriptor instance
- *
- * @param tableId Id of the table
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
*/
public BaseLocalStoreBackedTableDescriptor(String tableId) {
super(tableId);
}
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
+ * @param serde the serde for key and value
+ */
+ public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
public D withSideInputs(List<String> sideInputs) {
this.sideInputs = sideInputs;
// Disable changelog
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 1ada813..a1c1bdd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -100,10 +100,9 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
TableDescriptor tableDescriptor = null;
if (isTable) {
- tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
- .withSerde(KVSerde.of(
- new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+ tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of(
+ new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+ new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
}
return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index bd61afd..8318e8a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -202,10 +202,9 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
} else {
String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-");
- tableDescriptor = new RocksDbTableDescriptor(tableId)
- .withSerde(KVSerde.of(
- new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
+ new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+ new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
}
tableDescMap.put(ioName, tableDescriptor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index fbf0539..e1386c8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -96,8 +96,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
final StreamApplication app = appDesc -> {
- Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1")
- .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+ KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
@@ -134,7 +134,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
final StreamApplication app = appDesc -> {
Table<KV<Integer, Profile>> table = appDesc.getTable(
- new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
appDesc.getInputStream(profileISD)
@@ -209,8 +209,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
final StreamApplication app = appDesc -> {
- Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1")
- .withSerde(profileKVSerde));
+ Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index adcea48..3c22818 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -143,8 +143,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
}
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new InMemoryTableDescriptor<Integer, Profile>(PROFILE_TABLE)
- .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
Profile profile = (Profile) msg.getMessage();
@@ -158,8 +157,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
static class DurablePageViewProfileJoin extends PageViewProfileJoin {
@Override
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new RocksDbTableDescriptor<Integer, Profile>(PROFILE_TABLE)
- .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 34ffbd4..d123cee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -137,13 +137,11 @@ public class TestTableDescriptorsProvider {
final RateLimiter readRateLimiter = mock(RateLimiter.class);
final MyReadFunction readFn = new MyReadFunction();
- tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+ tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new LongSerde()))
.withReadFunction(readFn)
- .withRateLimiter(readRateLimiter, null, null)
- .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
- tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
- .withBlockSize(4096)
- .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ .withRateLimiter(readRateLimiter, null, null));
+ tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
+ .withBlockSize(4096));
return tableDescriptors;
}
}