You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/21 01:22:35 UTC
[2/3] samza git commit: SAMZA-1998: Table API refactoring
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index bf032a2..1e44fa5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -22,8 +22,6 @@ import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.table.TableSpec;
-
/**
* The spec for operator that writes a stream to a table by extracting keys and values
@@ -35,21 +33,21 @@ import org.apache.samza.table.TableSpec;
@InterfaceStability.Unstable
public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> {
- private final TableSpec tableSpec;
+ private final String tableId;
/**
* Constructor for a {@link SendToTableOperatorSpec}.
*
- * @param tableSpec the table spec of the table written to
+ * @param tableId the Id of the table written to
* @param opId the unique ID for this operator
*/
- SendToTableOperatorSpec(TableSpec tableSpec, String opId) {
+ SendToTableOperatorSpec(String tableId, String opId) {
super(OpCode.SEND_TO, opId);
- this.tableSpec = tableSpec;
+ this.tableId = tableId;
}
- public TableSpec getTableSpec() {
- return tableSpec;
+ public String getTableId() {
+ return tableId;
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
index 1849c64..a4f6068 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -22,8 +22,6 @@ import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.table.TableSpec;
-
/**
* The spec for stream-table join operator that retrieves a record from the table using key
@@ -36,24 +34,24 @@ import org.apache.samza.table.TableSpec;
@InterfaceStability.Unstable
public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM> {
- private final TableSpec tableSpec;
+ private final String tableId;
private final StreamTableJoinFunction<K, M, R, JM> joinFn;
/**
* Constructor for {@link StreamTableJoinOperatorSpec}.
*
- * @param tableSpec the table spec for the table on the right side of the join
+ * @param tableId the Id of the table on the right side of the join
* @param joinFn the user-defined join function to get join keys and results
* @param opId the unique ID for this operator
*/
- StreamTableJoinOperatorSpec(TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+ StreamTableJoinOperatorSpec(String tableId, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
super(OpCode.JOIN, opId);
- this.tableSpec = tableSpec;
+ this.tableId = tableId;
this.joinFn = joinFn;
}
- public TableSpec getTableSpec() {
- return tableSpec;
+ public String getTableId() {
+ return tableId;
}
public StreamTableJoinFunction<K, M, R, JM> getJoinFn() {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
index 6fa040b..bb83e9c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -18,10 +18,7 @@
*/
package org.apache.samza.table;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.context.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,38 +31,31 @@ abstract public class BaseTableProvider implements TableProvider {
final protected Logger logger = LoggerFactory.getLogger(getClass());
- final protected TableSpec tableSpec;
+ final protected String tableId;
- protected Context context;
+ // Job config
+ final protected Config config;
- public BaseTableProvider(TableSpec tableSpec) {
- this.tableSpec = tableSpec;
- }
+ protected Context context;
/**
- * {@inheritDoc}
+ * Construct the table provider using table Id and job configuration
+ * @param tableId Id of the table
+ * @param config job configuration
*/
+ public BaseTableProvider(String tableId, Config config) {
+ this.tableId = tableId;
+ this.config = config;
+ }
+
@Override
public void init(Context context) {
this.context = context;
+ logger.info("Initializing table provider for table " + tableId);
}
- /**
- * {@inheritDoc}
- */
@Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
- Map<String, String> tableConfig = new HashMap<>();
-
- // Insert table_id prefix to config entries
- tableSpec.getConfig().forEach((k, v) -> {
- String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
- tableConfig.put(realKey, v);
- });
-
- logger.info("Generated configuration for table " + tableSpec.getId());
-
- return tableConfig;
+ public void close() {
+ logger.info("Closing table provider for table " + tableId);
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 55db637..05cfacd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -19,80 +19,26 @@
package org.apache.samza.table;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.operators.TableImpl;
-import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Helper class to generate table configs.
*/
public class TableConfigGenerator {
- private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
-
- /**
- * Generate table configurations given a list of table descriptors
- * @param config the job configuration
- * @param tableDescriptors the list of tableDescriptors
- * @return configuration for the tables
- */
- static public Map<String, String> generateConfigsForTableDescs(Config config, List<TableDescriptor> tableDescriptors) {
- return generateConfigsForTableSpecs(config, getTableSpecs(tableDescriptors));
- }
-
- /**
- * Generate table configurations given a list of table specs
- * @param config the job configuration
- * @param tableSpecs the list of tableSpecs
- * @return configuration for the tables
- */
- static public Map<String, String> generateConfigsForTableSpecs(Config config, List<TableSpec> tableSpecs) {
- Map<String, String> tableConfigs = new HashMap<>();
- tableSpecs.forEach(tableSpec -> {
- // Add table provider factory config
- tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
- tableSpec.getTableProviderFactoryClassName());
-
- // Generate additional configuration
- TableProviderFactory tableProviderFactory =
- Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
- TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
- tableConfigs.putAll(tableProvider.generateConfig(config, tableConfigs));
- });
+ private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
- LOG.info("TableConfigGenerator has generated configs {}", tableConfigs);
- return tableConfigs;
+ static public Map<String, String> generate(Config jobConfig, List<TableDescriptor> tableDescriptors) {
+ Map<String, String> tableConfig = new HashMap<>();
+ tableDescriptors.forEach(tableDescriptor -> tableConfig.putAll(tableDescriptor.toConfig(jobConfig)));
+ LOG.info("TableConfigGenerator has generated configs {}", tableConfig);
+ return tableConfig;
}
- /**
- * Get list of table specs given a list of table descriptors.
- * @param tableDescs the list of tableDescriptors
- * @return list of tableSpecs
- */
- static public List<TableSpec> getTableSpecs(List<TableDescriptor> tableDescs) {
- Map<TableSpec, TableImpl> tableSpecs = new LinkedHashMap<>();
-
- tableDescs.forEach(tableDesc -> {
- TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
-
- if (tableSpecs.containsKey(tableSpec)) {
- throw new IllegalStateException(
- String.format("getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId()));
- }
- tableSpecs.put(tableSpec, new TableImpl(tableSpec));
- });
- return new ArrayList<>(tableSpecs.keySet());
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d7b15a4..cae7548 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -23,8 +23,6 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.context.Context;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +33,12 @@ import java.util.Map;
/**
* A {@link TableManager} manages tables within a Samza task. For each table, it maintains
- * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance.
+ * the {@link TableProvider} and the {@link Table} instance.
* It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve
* table instances for read/write operations.
*
- * A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
- * and {@link TableProvider} are constructed by processing the job configuration
+ * A {@link TableManager} is constructed from job configuration, the
+ * {@link TableProvider} are constructed by processing the job configuration
* during initialization. The {@link Table} is constructed when {@link #getTable(String)}
* is called and cached.
*
@@ -54,8 +52,7 @@ import java.util.Map;
*/
public class TableManager {
- static public class TableCtx {
- private TableSpec tableSpec;
+ static class TableCtx {
private TableProvider tableProvider;
private Table table;
}
@@ -70,27 +67,13 @@ public class TableManager {
/**
* Construct a table manager instance
* @param config job configuration
- * @param serdes Serde instances for tables
*/
- public TableManager(Config config, Map<String, Serde<Object>> serdes) {
+ public TableManager(Config config) {
new JavaTableConfig(config).getTableIds().forEach(tableId -> {
-
- // Construct the table provider
- String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
-
- // Construct the KVSerde
- JavaTableConfig tableConfig = new JavaTableConfig(config);
- KVSerde serde = KVSerde.of(
- serdes.get(tableConfig.getKeySerde(tableId)),
- serdes.get(tableConfig.getValueSerde(tableId)));
-
- TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
- config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));
-
- addTable(tableSpec);
-
- logger.info("Added table " + tableSpec.getId());
+ addTable(tableId, config);
+ logger.debug("Added table " + tableId);
});
+ logger.info(String.format("Added %d tables", tableContexts.size()));
}
/**
@@ -102,20 +85,17 @@ public class TableManager {
initialized = true;
}
- /**
- * Add a table to the table manager
- * @param tableSpec the table spec
- */
- private void addTable(TableSpec tableSpec) {
- if (tableContexts.containsKey(tableSpec.getId())) {
- throw new SamzaException("Table " + tableSpec.getId() + " already exists");
+ private void addTable(String tableId, Config config) {
+ if (tableContexts.containsKey(tableId)) {
+ throw new SamzaException("Table " + tableId + " already exists");
}
- TableCtx ctx = new TableCtx();
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
+ String providerFactoryClassName = tableConfig.getTableProviderFactory(tableId);
TableProviderFactory tableProviderFactory =
- Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
- ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
- ctx.tableSpec = tableSpec;
- tableContexts.put(tableSpec.getId(), ctx);
+ Util.getObj(providerFactoryClassName, TableProviderFactory.class);
+ TableCtx ctx = new TableCtx();
+ ctx.tableProvider = tableProviderFactory.getTableProvider(tableId, config);
+ tableContexts.put(tableId, ctx);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index 32d2bed..703a6ff 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -85,9 +85,6 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
this.isWriteAround = isWriteAround;
}
- /**
- * {@inheritDoc}
- */
@Override
public void init(Context context) {
readMetrics = new DefaultTableReadMetrics(context, this, tableId);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index 1a400a4..f78347a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -23,10 +23,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.BaseTableProvider;
@@ -41,16 +42,17 @@ public class CachingTableProvider extends BaseTableProvider {
// Store the cache instances created by default
private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
- public CachingTableProvider(TableSpec tableSpec) {
- super(tableSpec);
+ public CachingTableProvider(String tableId, Config config) {
+ super(tableId, config);
}
@Override
public Table getTable() {
- String realTableId = tableSpec.getConfig().get(CachingTableDescriptor.REAL_TABLE_ID);
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
+ String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
- String cacheTableId = tableSpec.getConfig().get(CachingTableDescriptor.CACHE_TABLE_ID);
+ String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID);
ReadWriteTable cache;
if (cacheTableId != null) {
@@ -60,21 +62,23 @@ public class CachingTableProvider extends BaseTableProvider {
defaultCaches.add(cache);
}
- boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(CachingTableDescriptor.WRITE_AROUND));
- CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
+ boolean isWriteAround = Boolean.parseBoolean(tableConfig.getForTable(tableId, CachingTableDescriptor.WRITE_AROUND));
+ CachingTable cachingTable = new CachingTable(tableId, table, cache, isWriteAround);
cachingTable.init(this.context);
return cachingTable;
}
@Override
public void close() {
+ super.close();
defaultCaches.forEach(c -> c.close());
}
private ReadWriteTable createDefaultCacheTable(String tableId) {
- long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.READ_TTL_MS, "-1"));
- long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.WRITE_TTL_MS, "-1"));
- long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.CACHE_SIZE, "-1"));
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
+ long readTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.READ_TTL_MS, "-1"));
+ long writeTtlMs = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.WRITE_TTL_MS, "-1"));
+ long cacheSize = Long.parseLong(tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_SIZE, "-1"));
CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
if (readTtlMs != -1) {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
index 2ac3694..f421538 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
@@ -19,16 +19,17 @@
package org.apache.samza.table.caching;
+import org.apache.samza.config.Config;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
/**
* Table provider factory for {@link org.apache.samza.table.caching.CachingTable}.
*/
public class CachingTableProviderFactory implements TableProviderFactory {
@Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new CachingTableProvider(tableSpec);
+ public TableProvider getTableProvider(String tableId, Config config) {
+ return new CachingTableProvider(tableId, config);
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index 5f77ee4..391f068 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -48,9 +48,6 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
this.cache = cache;
}
- /**
- * {@inheritDoc}
- */
@Override
public void init(Context context) {
TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index 5c9b2af..21c78cc 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -22,8 +22,9 @@ package org.apache.samza.table.caching.guava;
import java.util.ArrayList;
import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.table.BaseTableProvider;
import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
import org.apache.samza.table.utils.SerdeUtils;
@@ -38,15 +39,16 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
private List<GuavaCacheTable> guavaTables = new ArrayList<>();
- public GuavaCacheTableProvider(TableSpec tableSpec) {
- super(tableSpec);
+ public GuavaCacheTableProvider(String tableId, Config config) {
+ super(tableId, config);
}
@Override
public Table getTable() {
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
- tableSpec.getConfig().get(GuavaCacheTableDescriptor.GUAVA_CACHE));
- GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
+ tableConfig.getForTable(tableId, GuavaCacheTableDescriptor.GUAVA_CACHE));
+ GuavaCacheTable table = new GuavaCacheTable(tableId, guavaCache);
table.init(this.context);
guavaTables.add(table);
return table;
@@ -54,6 +56,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
@Override
public void close() {
+ super.close();
guavaTables.forEach(t -> t.close());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
index ac060c4..d5323f5 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
@@ -19,16 +19,16 @@
package org.apache.samza.table.caching.guava;
+import org.apache.samza.config.Config;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
/**
* Table provider factory for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
*/
public class GuavaCacheTableProviderFactory implements TableProviderFactory {
@Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new GuavaCacheTableProvider(tableSpec);
+ public TableProvider getTableProvider(String tableId, Config config) {
+ return new GuavaCacheTableProvider(tableId, config);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 52bdc71..60ac4b7 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -56,9 +56,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
this.writeRateLimiter = writeRateLimiter;
}
- /**
- * {@inheritDoc}
- */
@Override
public void init(Context context) {
super.init(context);
@@ -67,9 +64,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
}
- /**
- * {@inheritDoc}
- */
@Override
public void put(K key, V value) {
try {
@@ -79,9 +73,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
}
}
- /**
- * {@inheritDoc}
- */
@Override
public CompletableFuture<Void> putAsync(K key, V value) {
Preconditions.checkNotNull(key);
@@ -96,9 +87,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
});
}
- /**
- * {@inheritDoc}
- */
@Override
public void putAll(List<Entry<K, V>> entries) {
try {
@@ -108,9 +96,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
}
}
- /**
- * {@inheritDoc}
- */
@Override
public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
Preconditions.checkNotNull(records);
@@ -139,9 +124,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
});
}
- /**
- * {@inheritDoc}
- */
@Override
public void delete(K key) {
try {
@@ -151,9 +133,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
}
}
- /**
- * {@inheritDoc}
- */
@Override
public CompletableFuture<Void> deleteAsync(K key) {
Preconditions.checkNotNull(key);
@@ -164,9 +143,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
});
}
- /**
- * {@inheritDoc}
- */
@Override
public void deleteAll(List<K> keys) {
try {
@@ -176,9 +152,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
}
}
- /**
- * {@inheritDoc}
- */
@Override
public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
Preconditions.checkNotNull(keys);
@@ -193,9 +166,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
});
}
- /**
- * {@inheritDoc}
- */
@Override
public void flush() {
try {
@@ -210,9 +180,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
}
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
writeFn.close();
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index f0d781a..1b6bfea 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -103,9 +103,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
this.logger = LoggerFactory.getLogger(getClass().getName() + "-" + tableId);
}
- /**
- * {@inheritDoc}
- */
@Override
public void init(Context context) {
readMetrics = new DefaultTableReadMetrics(context, this, tableId);
@@ -113,9 +110,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
}
- /**
- * {@inheritDoc}
- */
@Override
public V get(K key) {
try {
@@ -141,9 +135,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
});
}
- /**
- * {@inheritDoc}
- */
@Override
public Map<K, V> getAll(List<K> keys) {
readMetrics.numGetAlls.inc();
@@ -297,9 +288,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
return ioFuture;
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
readFn.close();
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 4b17e23..8f0b2fd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -19,8 +19,9 @@
package org.apache.samza.table.remote;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -38,7 +39,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-
/**
* Provide for remote table instances
*/
@@ -57,31 +57,30 @@ public class RemoteTableProvider extends BaseTableProvider {
private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
private static ScheduledExecutorService retryExecutor;
- public RemoteTableProvider(TableSpec tableSpec) {
- super(tableSpec);
- this.readOnly = !tableSpec.getConfig().containsKey(RemoteTableDescriptor.WRITE_FN);
+ public RemoteTableProvider(String tableId, Config config) {
+ super(tableId, config);
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
+ this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
}
- /**
- * {@inheritDoc}
- */
@Override
public Table getTable() {
RemoteReadableTable table;
- String tableId = tableSpec.getId();
- TableReadFunction readFn = getReadFn();
- RateLimiter rateLimiter = deserializeObject(RemoteTableDescriptor.RATE_LIMITER);
+ JavaTableConfig tableConfig = new JavaTableConfig(config);
+
+ TableReadFunction readFn = getReadFn(tableConfig);
+ RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
if (rateLimiter != null) {
rateLimiter.init(this.context);
}
- TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(RemoteTableDescriptor.READ_CREDIT_FN);
- TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
+ TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
+ TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
TableRateLimiter writeRateLimiter = null;
- TableRetryPolicy readRetryPolicy = deserializeObject(RemoteTableDescriptor.READ_RETRY_POLICY);
+ TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
TableRetryPolicy writeRetryPolicy = null;
if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
@@ -97,21 +96,21 @@ public class RemoteTableProvider extends BaseTableProvider {
readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
}
- TableWriteFunction writeFn = getWriteFn();
+ TableWriteFunction writeFn = getWriteFn(tableConfig);
boolean isRateLimited = readRateLimiter.isRateLimited();
if (!readOnly) {
- writeCreditFn = deserializeObject(RemoteTableDescriptor.WRITE_CREDIT_FN);
- writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
+ writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+ writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
isRateLimited |= writeRateLimiter.isRateLimited();
- writeRetryPolicy = deserializeObject(RemoteTableDescriptor.WRITE_RETRY_POLICY);
+ writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
if (writeRetryPolicy != null) {
writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
}
}
// Optional executor for future callback/completion. Shared by both read and write operations.
- int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE));
+ int callbackPoolSize = Integer.parseInt(tableConfig.getForTable(tableId, RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE, "-1"));
if (callbackPoolSize > 0) {
callbackExecutors.computeIfAbsent(tableId, (arg) ->
Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
@@ -133,10 +132,10 @@ public class RemoteTableProvider extends BaseTableProvider {
}
if (readOnly) {
- table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
+ table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
tableExecutors.get(tableId), callbackExecutors.get(tableId));
} else {
- table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
+ table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
}
@@ -153,34 +152,32 @@ public class RemoteTableProvider extends BaseTableProvider {
return table;
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
+ super.close();
tables.forEach(t -> t.close());
tableExecutors.values().forEach(e -> e.shutdown());
callbackExecutors.values().forEach(e -> e.shutdown());
}
- private <T> T deserializeObject(String key) {
- String entry = tableSpec.getConfig().getOrDefault(key, "");
+ private <T> T deserializeObject(JavaTableConfig tableConfig, String key) {
+ String entry = tableConfig.getForTable(tableId, key, "");
if (entry.isEmpty()) {
return null;
}
return SerdeUtils.deserialize(key, entry);
}
- private TableReadFunction<?, ?> getReadFn() {
- TableReadFunction<?, ?> readFn = deserializeObject(RemoteTableDescriptor.READ_FN);
+ private TableReadFunction<?, ?> getReadFn(JavaTableConfig tableConfig) {
+ TableReadFunction<?, ?> readFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_FN);
if (readFn != null) {
readFn.init(this.context);
}
return readFn;
}
- private TableWriteFunction<?, ?> getWriteFn() {
- TableWriteFunction<?, ?> writeFn = deserializeObject(RemoteTableDescriptor.WRITE_FN);
+ private TableWriteFunction<?, ?> getWriteFn(JavaTableConfig tableConfig) {
+ TableWriteFunction<?, ?> writeFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_FN);
if (writeFn != null) {
writeFn.init(this.context);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
index 0eb88fd..723288d 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -19,20 +19,16 @@
package org.apache.samza.table.remote;
+import org.apache.samza.config.Config;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-import com.google.common.base.Preconditions;
-
/**
* Factory class for a remote table provider
*/
public class RemoteTableProviderFactory implements TableProviderFactory {
@Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- Preconditions.checkNotNull(tableSpec, "null table spec");
- return new RemoteTableProvider(tableSpec);
+ public TableProvider getTableProvider(String tableId, Config config) {
+ return new RemoteTableProvider(tableId, config);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 87ab86b..7b4410e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -637,7 +637,7 @@ object SamzaContainer extends Logging {
new SystemClock)
}
- val tableManager = new TableManager(config, serdes.asJava)
+ val tableManager = new TableManager(config)
info("Got table manager")
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index 3b680fc..141d31b 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -54,7 +54,6 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.junit.Test;
@@ -514,16 +513,14 @@ public class TestStreamApplicationDescriptorImpl {
public void testGetTable() throws Exception {
Config mockConfig = getConfig();
+ String tableId = "t1";
BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
- when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
- when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
- when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
+ when(mockTableDescriptor.getTableId()).thenReturn(tableId);
AtomicReference<TableImpl> table = new AtomicReference<>();
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
table.set((TableImpl) appDesc.getTable(mockTableDescriptor));
}, mockConfig);
- assertEquals(testTableSpec.getId(), table.get().getTableSpec().getId());
+ assertEquals(tableId, table.get().getTableId());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
index 14b480d..c870e80 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestTaskApplicationDescriptorImpl.java
@@ -33,7 +33,6 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.task.TaskFactory;
import org.junit.Before;
import org.junit.Test;
@@ -73,8 +72,6 @@ public class TestTaskApplicationDescriptorImpl {
BaseTableDescriptor mock2 = mock(BaseTableDescriptor.class);
when(mock1.getTableId()).thenReturn("test-table1");
when(mock2.getTableId()).thenReturn("test-table2");
- when(mock1.getSerde()).thenReturn(mock(KVSerde.class));
- when(mock2.getSerde()).thenReturn(mock(KVSerde.class));
this.add(mock1);
this.add(mock2);
} };
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 299d631..7908764 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -41,12 +41,12 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -65,7 +65,7 @@ import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor;
import org.apache.samza.testUtils.StreamTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -264,7 +264,8 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
- TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+ TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+ "table-id", new KVSerde(new StringSerde(), new StringSerde()));
Table table = appDesc.getTable(tableDescriptor);
messageStream2
@@ -352,7 +353,8 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
- TableDescriptor tableDescriptor = new TestTableDescriptor("table-id");
+ TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+ "table-id", new KVSerde(new StringSerde(), new StringSerde()));
Table table = appDesc.getTable(tableDescriptor);
messageStream1.sendTo(table);
@@ -378,8 +380,10 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> messageStream2 = appDesc.getInputStream(input2Descriptor);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
- TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input1"),
- (message, store) -> Collections.emptyList());
+ TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+ "table-id", new KVSerde(new StringSerde(), new StringSerde()))
+ .withSideInputs(Arrays.asList("input1"))
+ .withSideInputsProcessor(mock(SideInputsProcessor.class));
Table table = appDesc.getTable(tableDescriptor);
messageStream2
@@ -404,8 +408,10 @@ public class TestExecutionPlanner {
MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
- TableDescriptor tableDescriptor = new TestTableDescriptor("table-id", Arrays.asList("input2"),
- (message, store) -> Collections.emptyList());
+ TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor(
+ "table-id", new KVSerde(new StringSerde(), new StringSerde()))
+ .withSideInputs(Arrays.asList("input2"))
+ .withSideInputsProcessor(mock(SideInputsProcessor.class));
Table table = appDesc.getTable(tableDescriptor);
messageStream1
@@ -822,26 +828,4 @@ public class TestExecutionPlanner {
}
}
-
- private static class TestTableDescriptor extends BaseTableDescriptor implements TableDescriptor {
- private final List<String> sideInputs;
- private final SideInputsProcessor sideInputsProcessor;
-
- public TestTableDescriptor(String tableId) {
- this(tableId, Collections.emptyList(), null);
- }
-
- public TestTableDescriptor(String tableId, List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
- super(tableId);
- this.sideInputs = sideInputs;
- this.sideInputsProcessor = sideInputsProcessor;
- }
-
- @Override
- public TableSpec getTableSpec() {
- validate();
- return new TableSpec(tableId, serde, "dummyTableProviderFactoryClassName",
- Collections.emptyMap(), sideInputs, sideInputsProcessor);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index b47014d..b620901 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -48,6 +48,8 @@ import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor;
import org.apache.samza.testUtils.StreamTestUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.hamcrest.Matchers;
@@ -76,6 +78,8 @@ public class TestJobGraphJsonGenerator {
private GenericInputDescriptor<KV<String, Object>> input1Descriptor;
private GenericInputDescriptor<KV<String, Object>> input2Descriptor;
private GenericOutputDescriptor<KV<String, Object>> outputDescriptor;
+ private TableDescriptor<String, Object, ?> table1Descriptor;
+ private TableDescriptor<String, Object, ?> table2Descriptor;
@Before
public void setUp() {
@@ -93,6 +97,8 @@ public class TestJobGraphJsonGenerator {
input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", defaultSerde);
input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", defaultSerde);
outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", defaultSerde);
+ table1Descriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor("table1", defaultSerde);
+ table2Descriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor("table2", defaultSerde);
Map<String, String> configs = new HashMap<>();
configs.put(JobConfig.JOB_NAME(), "jobName");
@@ -117,6 +123,11 @@ public class TestJobGraphJsonGenerator {
when(mockJobNode.getJobName()).thenReturn("jobName");
when(mockJobNode.getJobId()).thenReturn("jobId");
when(mockJobNode.getJobNameAndId()).thenReturn(JobNode.createJobNameAndId("jobName", "jobId"));
+
+ Map<String, TableDescriptor> tables = new HashMap<>();
+ tables.put(table1Descriptor.getTableId(), table1Descriptor);
+ tables.put(table2Descriptor.getTableId(), table2Descriptor);
+ when(mockJobNode.getTables()).thenReturn(tables);
}
@Test
@@ -305,6 +316,8 @@ public class TestJobGraphJsonGenerator {
when(mockJobGraph.getInputStreams()).thenReturn(inEdges);
when(mockJobGraph.getOutputStreams()).thenReturn(outEdges);
when(mockJobGraph.getIntermediateStreamEdges()).thenReturn(intermediateEdges);
+ Set<TableDescriptor> tables = new HashSet<>(mockJobNode.getTables().values());
+ when(mockJobGraph.getTables()).thenReturn(tables);
when(mockJobGraph.getJobNodes()).thenReturn(Collections.singletonList(mockJobNode));
String graphJson = jsonGenerator.toJson(mockJobGraph);
ObjectMapper objectMapper = new ObjectMapper();
@@ -317,6 +330,8 @@ public class TestJobGraphJsonGenerator {
assertThat(jsonObject.sinkStreams.keySet(), Matchers.containsInAnyOrder(outStreamIds.toArray()));
Set<String> intStreamIds = intermediateEdges.stream().map(stream -> stream.getStreamSpec().getId()).collect(Collectors.toSet());
assertThat(jsonObject.intermediateStreams.keySet(), Matchers.containsInAnyOrder(intStreamIds.toArray()));
+ Set<String> tableIds = tables.stream().map(t -> t.getTableId()).collect(Collectors.toSet());
+ assertThat(jsonObject.tables.keySet(), Matchers.containsInAnyOrder(tableIds.toArray()));
JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new JobGraphJsonGenerator.JobNodeJson();
expectedNodeJson.jobId = mockJobNode.getJobId();
expectedNodeJson.jobName = mockJobNode.getJobName();
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 86d339e..1c4abc6 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,6 +19,7 @@
package org.apache.samza.execution;
import com.google.common.base.Joiner;
+import java.util.Arrays;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
import org.apache.samza.config.Config;
@@ -28,38 +29,31 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.context.Context;
+import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockLocalTableDescriptor;
+import org.apache.samza.table.descriptors.TestLocalTableDescriptor.MockTableProviderFactory;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.impl.store.TimestampedValueSerde;
import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
/**
@@ -161,17 +155,10 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
// add table to the RepartitionJoinStreamApplication
GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- TableSpec mockTableSpec = mock(TableSpec.class);
- when(mockTableSpec.getId()).thenReturn("testTable");
- when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
- when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
- List<String> sideInputs = new ArrayList<>();
- sideInputs.add(sideInput1.getStreamId());
- when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
- when(mockTableDescriptor.getTableId()).thenReturn("testTable");
- when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
- when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+ BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
+ .withSideInputs(Arrays.asList(sideInput1.getStreamId()))
+ .withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
+ .withConfig("mock.table.provider.config", "mock.config.value");
// add side input and terminate at table in the appplication
mockStreamAppDesc.getInputStream(sideInput1).sendTo(mockStreamAppDesc.getTable(mockTableDescriptor));
StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
@@ -198,17 +185,10 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
public void testTaskApplicationWithTableAndSideInput() {
// add table to the RepartitionJoinStreamApplication
GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde);
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- TableSpec mockTableSpec = mock(TableSpec.class);
- when(mockTableSpec.getId()).thenReturn("testTable");
- when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde);
- when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName());
- List<String> sideInputs = new ArrayList<>();
- sideInputs.add(sideInput1.getStreamId());
- when(mockTableSpec.getSideInputs()).thenReturn(sideInputs);
- when(mockTableDescriptor.getTableId()).thenReturn("testTable");
- when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec);
- when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde);
+ BaseTableDescriptor mockTableDescriptor = new MockLocalTableDescriptor("testTable", defaultSerde)
+ .withSideInputs(Arrays.asList(sideInput1.getStreamId()))
+ .withSideInputsProcessor(mock(SideInputsProcessor.class, withSettings().serializable()))
+ .withConfig("mock.table.provider.config", "mock.config.value");
StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1",
inputSystemDescriptor.getSystemName()), false, false, mockConfig);
// need to put the sideInput related stream configuration to the original config
@@ -260,9 +240,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
TableDescriptor tableDescriptor) {
Config tableConfig = jobConfig.subset(String.format("tables.%s.", tableDescriptor.getTableId()));
assertEquals(MockTableProviderFactory.class.getName(), tableConfig.get("provider.factory"));
- MockTableProvider mockTableProvider =
- (MockTableProvider) new MockTableProviderFactory().getTableProvider(((BaseTableDescriptor) tableDescriptor).getTableSpec());
- assertEquals(mockTableProvider.configMap.get("mock.table.provider.config"), jobConfig.get("mock.table.provider.config"));
+ assertEquals("mock.config.value", jobConfig.get("mock.table.provider.config"));
validateTableSerdeConfigure(tableDescriptor.getTableId(), jobConfig, deserializedSerdes);
}
@@ -316,9 +294,9 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
}
private void validateTableSerdeConfigure(String tableId, Config config, Map<String, Serde> deserializedSerdes) {
- Config streamConfig = config.subset(String.format("tables.%s.", tableId));
+ Config streamConfig = config.subset(String.format("stores.%s.", tableId));
String keySerdeName = streamConfig.get("key.serde");
- String valueSerdeName = streamConfig.get("value.serde");
+ String valueSerdeName = streamConfig.get("msg.serde");
assertTrue(String.format("Serialized serdes should contain %s key serde", tableId), deserializedSerdes.containsKey(keySerdeName));
assertTrue(String.format("Serialized %s key serde should be a StringSerde", tableId), keySerdeName.startsWith(StringSerde.class.getSimpleName()));
assertTrue(String.format("Serialized serdes should contain %s value serde", tableId), deserializedSerdes.containsKey(valueSerdeName));
@@ -387,46 +365,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
assertEquals("3600000", joinStoreConfig.get("rocksdb.ttl.ms"));
}
- private static class MockTableProvider implements TableProvider {
- private final Map<String, String> configMap;
-
- MockTableProvider(Map<String, String> configMap) {
- this.configMap = configMap;
- }
-
- @Override
- public void init(Context context) {
-
- }
-
- @Override
- public Table getTable() {
- return null;
- }
-
- @Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
- return configMap;
- }
-
- @Override
- public void close() {
-
- }
- }
-
- public static class MockTableProviderFactory implements TableProviderFactory {
-
- @Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- Map<String, String> configMap = new HashMap<>();
- configMap.put("mock.table.provider.config", "mock.config.value");
- return new MockTableProvider(configMap);
- }
- }
-
public static class MockConfigRewriter implements ConfigRewriter {
-
@Override
public Config rewrite(String name, Config config) {
Map<String, String> configMap = new HashMap<>(config);
@@ -434,4 +373,5 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
return new MapConfig(configMap);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index ce9435f..ab7cf5f 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -52,7 +52,6 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -292,8 +291,7 @@ public class TestMessageStreamImpl {
OperatorSpec inputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec);
- TableSpec tableSpec = new TableSpec();
- TableImpl table = new TableImpl(tableSpec);
+ TableImpl table = new TableImpl("t1");
source.sendTo(table);
@@ -305,7 +303,7 @@ public class TestMessageStreamImpl {
SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec;
assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
- assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
+ assertEquals(table.getTableId(), sendToTableOperatorSpec.getTableId());
}
@Test
@@ -316,8 +314,7 @@ public class TestMessageStreamImpl {
OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
- TableSpec tableSpec = new TableSpec();
- TableImpl table = new TableImpl(tableSpec);
+ TableImpl table = new TableImpl("t1");
source2.sendTo(table);
@@ -333,7 +330,7 @@ public class TestMessageStreamImpl {
StreamTableJoinOperatorSpec joinOpSpec = (StreamTableJoinOperatorSpec) leftRegisteredOpSpec;
assertEquals(OpCode.JOIN, joinOpSpec.getOpCode());
assertEquals(mockJoinFn, joinOpSpec.getJoinFn());
- assertEquals(tableSpec, joinOpSpec.getTableSpec());
+ assertEquals(table.getTableId(), joinOpSpec.getTableId());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 9083495..4112c8b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -27,7 +27,6 @@ import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -43,11 +42,9 @@ public class TestStreamTableJoinOperatorImpl {
public void testHandleMessage() {
String tableId = "t1";
- TableSpec tableSpec = mock(TableSpec.class);
- when(tableSpec.getId()).thenReturn(tableId);
StreamTableJoinOperatorSpec mockJoinOpSpec = mock(StreamTableJoinOperatorSpec.class);
- when(mockJoinOpSpec.getTableSpec()).thenReturn(tableSpec);
+ when(mockJoinOpSpec.getTableId()).thenReturn(tableId);
when(mockJoinOpSpec.getJoinFn()).thenReturn(
new StreamTableJoinFunction<String, KV<String, String>, KV<String, String>, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index 454a661..b93544f 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -31,7 +31,6 @@ import org.apache.samza.operators.TableImpl;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.table.TableSpec;
import static org.junit.Assert.*;
@@ -90,17 +89,9 @@ public class OperatorSpecTestUtils {
assertNotEquals(scheduledFn, scheduledFn1);
}
- private static void assertClonedTables(Map<TableSpec, TableImpl> originalTables, Map<TableSpec, TableImpl> clonedTables) {
- assertEquals(originalTables.size(), clonedTables.size());
- assertEquals(originalTables.keySet(), clonedTables.keySet());
- Iterator<TableImpl> oIter = originalTables.values().iterator();
- Iterator<TableImpl> nIter = clonedTables.values().iterator();
- oIter.forEachRemaining(oTable -> assertClonedTableImpl(oTable, nIter.next()));
- }
-
private static void assertClonedTableImpl(TableImpl oTable, TableImpl nTable) {
assertNotEquals(oTable, nTable);
- assertEquals(oTable.getTableSpec(), nTable.getTableSpec());
+ assertEquals(oTable.getTableId(), nTable.getTableId());
}
private static void assertClonedOutputs(Map<String, OutputStreamImpl> originalOutputs,
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index 6e91e2a..fd908ca 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -20,9 +20,7 @@ package org.apache.samza.operators.spec;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.data.TestMessageEnvelope;
@@ -36,19 +34,13 @@ import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.TableSpec;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
@@ -305,32 +297,28 @@ public class TestOperatorSpec {
public void testStreamTableJoinOperatorSpec() {
StreamTableJoinFunction<String, Object, Object, TestOutputMessageEnvelope> joinFn = new TestStreamTableJoinFunction();
- TableSpec tableSpec = new TableSpec("table-0", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>()), "my.table.provider.class",
- new MapConfig(new HashMap<String, String>() { { this.put("config1", "value1"); this.put("config2", "value2"); } }));
+ String tableId = "t1";
StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOperatorSpec =
- new StreamTableJoinOperatorSpec<>(tableSpec, joinFn, "join-3");
+ new StreamTableJoinOperatorSpec<>(tableId, joinFn, "join-3");
StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOpSpecCopy =
(StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(joinOperatorSpec);
assertNotEquals(joinOpSpecCopy, joinOperatorSpec);
assertEquals(joinOpSpecCopy.getOpId(), joinOperatorSpec.getOpId());
- assertTrue(joinOpSpecCopy.getTableSpec() != joinOperatorSpec.getTableSpec());
- assertEquals(joinOpSpecCopy.getTableSpec().getId(), joinOperatorSpec.getTableSpec().getId());
- assertEquals(joinOpSpecCopy.getTableSpec().getTableProviderFactoryClassName(), joinOperatorSpec.getTableSpec().getTableProviderFactoryClassName());
+ assertEquals(joinOpSpecCopy.getTableId(), joinOperatorSpec.getTableId());
}
@Test
public void testSendToTableOperatorSpec() {
- TableSpec tableSpec = new TableSpec("table-0", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>()), "my.table.provider.class",
- new MapConfig(new HashMap<String, String>() { { this.put("config1", "value1"); this.put("config2", "value2"); } }));
+ String tableId = "t1";
SendToTableOperatorSpec<String, Integer> sendOpSpec =
- new SendToTableOperatorSpec<>(tableSpec, "output-1");
+ new SendToTableOperatorSpec<>(tableId, "output-1");
SendToTableOperatorSpec<String, Integer> sendToCopy = (SendToTableOperatorSpec<String, Integer>) OperatorSpecTestUtils
.copyOpSpec(sendOpSpec);
assertNotEquals(sendToCopy, sendOpSpec);
assertEquals(sendToCopy.getOpId(), sendOpSpec.getOpId());
- assertTrue(sendToCopy.getTableSpec() != sendOpSpec.getTableSpec() && sendToCopy.getTableSpec().equals(sendOpSpec.getTableSpec()));
+ assertTrue(sendToCopy.getTableId().equals(sendOpSpec.getTableId()));
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 0952a87..1bbf3af 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -20,6 +20,7 @@ package org.apache.samza.table;
import junit.framework.Assert;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
@@ -53,7 +54,7 @@ public class TestTableManager {
static TableProvider tableProvider;
@Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
+ public TableProvider getTableProvider(String tableId, Config config) {
table = mock(ReadableTable.class);
tableProvider = mock(TableProvider.class);
when(tableProvider.getTable()).thenReturn(table);
@@ -79,47 +80,16 @@ public class TestTableManager {
doTestInit(map);
}
- @Test(expected = Exception.class)
- public void testInitFailsWithoutKeySerde() {
- Map<String, String> map = new HashMap<>();
- map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
- addValueSerde(map);
- doTestInit(map);
- }
-
- @Test(expected = Exception.class)
- public void testInitFailsWithoutValueSerde() {
- Map<String, String> map = new HashMap<>();
- map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
- addValueSerde(map);
- doTestInit(map);
- }
-
@Test(expected = IllegalStateException.class)
- public void testInitFailsWithoutInitializingLocalTables() {
- TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()), new HashMap<>());
+ public void testInitFailsWithoutInitializingLocalStores() {
+ TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()));
tableManager.getTable("dummy");
}
private void doTestInit(Map<String, String> map) {
Map<String, StorageEngine> storageEngines = new HashMap<>();
storageEngines.put(TABLE_ID, mock(StorageEngine.class));
-
- Map<String, Serde<Object>> serdeMap = new HashMap<>();
- SerializableSerde<Serde> serializableSerde = new SerializableSerde();
- map.keySet().stream()
- .filter(k -> k.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX()))
- .forEach(k -> {
- String serdeName = k
- .replace(String.format(SerializerConfig.SERIALIZER_PREFIX(), ""), "")
- .replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), "");
- String serializedSerde = map.get(k);
- byte[] bytes = Base64.getDecoder().decode(serializedSerde);
- Serde serde = serializableSerde.fromBytes(bytes);
- serdeMap.put(serdeName, serde);
- });
-
- TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
+ TableManager tableManager = new TableManager(new MapConfig(map));
tableManager.init(new MockContext());
for (int i = 0; i < 2; i++) {
@@ -131,13 +101,7 @@ public class TestTableManager {
Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tableContexts");
TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
-
- TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
- Assert.assertEquals(TABLE_ID, tableSpec.getId());
- Assert.assertEquals(DummyTableProviderFactory.class.getName(), tableSpec.getTableProviderFactoryClassName());
- Assert.assertEquals(IntegerSerde.class, tableSpec.getSerde().getKeySerde().getClass());
- Assert.assertEquals(StringSerde.class, tableSpec.getSerde().getValueSerde().getClass());
- Assert.assertEquals("xyz", tableSpec.getConfig().get("some.config"));
+ Assert.assertEquals(TABLE_ID, ctxMap.keySet().iterator().next());
TableProvider tableProvider = getFieldValue(ctx, "tableProvider");
Assert.assertNotNull(tableProvider);
@@ -147,14 +111,14 @@ public class TestTableManager {
String serdeId = "key-serde";
map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
serializeSerde(new IntegerSerde()));
- map.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, TABLE_ID), serdeId);
+ map.put(String.format(JavaTableConfig.STORE_KEY_SERDE, TABLE_ID), serdeId);
}
private void addValueSerde(Map<String, String> map) {
String serdeId = "value-serde";
map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
serializeSerde(new StringSerde("UTF-8")));
- map.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, TABLE_ID), serdeId);
+ map.put(String.format(JavaTableConfig.STORE_MSG_SERDE, TABLE_ID), serdeId);
}
private String serializeSerde(Serde serde) {
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index e29f6e6..36ec46f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -22,6 +22,9 @@ package org.apache.samza.table.caching;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
import org.apache.samza.metrics.Counter;
@@ -33,7 +36,6 @@ import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.TableSpec;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
@@ -65,7 +67,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
public class TestCachingTable {
@Test
public void testSerializeSimple() {
@@ -74,10 +75,11 @@ public class TestCachingTable {
@Test
public void testSerializeWithCacheInstance() {
- GuavaCacheTableDescriptor guavaTableDesc = new GuavaCacheTableDescriptor("guavaCacheId");
- guavaTableDesc.withCache(CacheBuilder.newBuilder().build());
- TableSpec spec = guavaTableDesc.getTableSpec();
- Assert.assertTrue(spec.getConfig().containsKey(GuavaCacheTableDescriptor.GUAVA_CACHE));
+ String tableId = "guavaCacheId";
+ GuavaCacheTableDescriptor guavaTableDesc = new GuavaCacheTableDescriptor(tableId)
+ .withCache(CacheBuilder.newBuilder().build());
+ Map<String, String> tableConfig = guavaTableDesc.toConfig(new MapConfig());
+ assertExists(GuavaCacheTableDescriptor.GUAVA_CACHE, tableId, tableConfig);
doTestSerialize(guavaTableDesc);
}
@@ -85,29 +87,28 @@ public class TestCachingTable {
CachingTableDescriptor desc;
TableDescriptor table = createDummyTableDescriptor("2");
if (cache == null) {
- desc = new CachingTableDescriptor("1", table);
- desc.withReadTtl(Duration.ofMinutes(3));
- desc.withWriteTtl(Duration.ofMinutes(3));
- desc.withCacheSize(1000);
+ desc = new CachingTableDescriptor("1", table)
+ .withReadTtl(Duration.ofMinutes(3))
+ .withWriteTtl(Duration.ofMinutes(4))
+ .withCacheSize(1000);
} else {
desc = new CachingTableDescriptor("1", table, cache);
}
desc.withWriteAround();
- TableSpec spec = desc.getTableSpec();
- Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.REAL_TABLE_ID));
+ Map<String, String> tableConfig = desc.toConfig(new MapConfig());
+
+ assertEquals("2", CachingTableDescriptor.REAL_TABLE_ID, "1", tableConfig);
if (cache == null) {
- Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.READ_TTL_MS));
- Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.WRITE_TTL_MS));
+ assertEquals("180000", CachingTableDescriptor.READ_TTL_MS, "1", tableConfig);
+ assertEquals("240000", CachingTableDescriptor.WRITE_TTL_MS, "1", tableConfig);
} else {
- Assert.assertTrue(spec.getConfig().containsKey(CachingTableDescriptor.CACHE_TABLE_ID));
+ assertEquals(cache.getTableId(), CachingTableDescriptor.CACHE_TABLE_ID, "1", tableConfig);
}
- Assert.assertEquals("true", spec.getConfig().get(CachingTableDescriptor.WRITE_AROUND));
-
- desc.validate();
+ assertEquals("true", CachingTableDescriptor.WRITE_AROUND, "1", tableConfig);
}
private static Pair<ReadWriteTable<String, String>, Map<String, String>> getMockCache() {
@@ -157,7 +158,9 @@ public class TestCachingTable {
if (isWriteAround) {
desc.withWriteAround();
}
- CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableSpec());
+
+ Config config = new MapConfig(desc.toConfig(new MapConfig()));
+ CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableId(), config);
Context context = new MockContext();
final ReadWriteTable cacheTable = getMockCache().getLeft();
@@ -364,8 +367,16 @@ public class TestCachingTable {
private TableDescriptor createDummyTableDescriptor(String tableId) {
BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class);
when(tableDescriptor.getTableId()).thenReturn(tableId);
- when(tableDescriptor.getTableSpec()).thenReturn(
- new TableSpec(tableId, null, null, new HashMap<>()));
return tableDescriptor;
}
+
+ private void assertExists(String key, String tableId, Map<String, String> config) {
+ String realKey = JavaTableConfig.buildKey(tableId, key);
+ Assert.assertTrue(config.containsKey(realKey));
+ }
+
+ private void assertEquals(String expectedValue, String key, String tableId, Map<String, String> config) {
+ String realKey = JavaTableConfig.buildKey(tableId, key);
+ Assert.assertEquals(expectedValue, config.get(realKey));
+ }
}
\ No newline at end of file