You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/21 01:22:36 UTC
[3/3] samza git commit: SAMZA-1998: Table API refactoring
SAMZA-1998: Table API refactoring
Table API refactoring
- Removed TableSpec
- Consolidated configuration generation for tables to table descriptors
- Refactored constructor so that only local table would require serde's
- Removed table provider for RocksDB- and in-memory tables, and added LocalTableProvider
- Updates to unit tests
- Various refactoring
Author: Wei Song <ws...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #807 from weisong44/SAMZA-1998
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5f7a22c3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5f7a22c3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5f7a22c3
Branch: refs/heads/master
Commit: 5f7a22c3bb991352cba58bc83d0df01a0ba38835
Parents: cce2b6f
Author: Wei Song <ws...@linkedin.com>
Authored: Tue Nov 20 17:22:18 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Tue Nov 20 17:22:18 2018 -0800
----------------------------------------------------------------------
.../apache/samza/config/JavaTableConfig.java | 118 +++++++++++++
.../org/apache/samza/table/TableProvider.java | 15 --
.../samza/table/TableProviderFactory.java | 10 +-
.../java/org/apache/samza/table/TableSpec.java | 172 -------------------
.../table/descriptors/BaseTableDescriptor.java | 68 +++-----
.../descriptors/CachingTableDescriptor.java | 70 ++++----
.../descriptors/GuavaCacheTableDescriptor.java | 32 ++--
.../descriptors/HybridTableDescriptor.java | 3 +-
.../table/descriptors/LocalTableDescriptor.java | 105 ++++++++---
.../descriptors/RemoteTableDescriptor.java | 122 ++++++-------
.../table/descriptors/TableDescriptor.java | 17 ++
.../samza/table/TestBaseTableDescriptor.java | 73 ++++++++
.../StreamApplicationDescriptorImpl.java | 10 +-
.../TaskApplicationDescriptorImpl.java | 8 +-
.../apache/samza/config/JavaTableConfig.java | 88 ----------
.../samza/execution/ExecutionPlanner.java | 35 ++--
.../org/apache/samza/execution/JobGraph.java | 14 +-
.../samza/execution/JobGraphJsonGenerator.java | 41 ++---
.../org/apache/samza/execution/JobNode.java | 10 +-
.../JobNodeConfigurationGenerator.java | 38 ++--
.../execution/OperatorSpecGraphAnalyzer.java | 29 ++--
.../samza/operators/MessageStreamImpl.java | 6 +-
.../org/apache/samza/operators/TableImpl.java | 18 +-
.../operators/impl/SendToTableOperatorImpl.java | 2 +-
.../impl/StreamTableJoinOperatorImpl.java | 2 +-
.../samza/operators/spec/OperatorSpecs.java | 14 +-
.../operators/spec/SendToTableOperatorSpec.java | 14 +-
.../spec/StreamTableJoinOperatorSpec.java | 14 +-
.../apache/samza/table/BaseTableProvider.java | 40 ++---
.../samza/table/TableConfigGenerator.java | 66 +------
.../org/apache/samza/table/TableManager.java | 54 ++----
.../samza/table/caching/CachingTable.java | 3 -
.../table/caching/CachingTableProvider.java | 24 +--
.../caching/CachingTableProviderFactory.java | 7 +-
.../table/caching/guava/GuavaCacheTable.java | 3 -
.../caching/guava/GuavaCacheTableProvider.java | 13 +-
.../guava/GuavaCacheTableProviderFactory.java | 6 +-
.../table/remote/RemoteReadWriteTable.java | 33 ----
.../samza/table/remote/RemoteReadableTable.java | 12 --
.../samza/table/remote/RemoteTableProvider.java | 57 +++---
.../remote/RemoteTableProviderFactory.java | 10 +-
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../TestStreamApplicationDescriptorImpl.java | 9 +-
.../TestTaskApplicationDescriptorImpl.java | 3 -
.../samza/execution/TestExecutionPlanner.java | 44 ++---
.../execution/TestJobGraphJsonGenerator.java | 15 ++
.../TestJobNodeConfigurationGenerator.java | 94 ++--------
.../samza/operators/TestMessageStreamImpl.java | 11 +-
.../impl/TestStreamTableJoinOperatorImpl.java | 5 +-
.../operators/spec/OperatorSpecTestUtils.java | 11 +-
.../samza/operators/spec/TestOperatorSpec.java | 26 +--
.../apache/samza/table/TestTableManager.java | 52 +-----
.../samza/table/caching/TestCachingTable.java | 53 +++---
.../descriptors/TestLocalTableDescriptor.java | 162 +++++++++++++++++
.../descriptors/TestRemoteTableDescriptor.java | 88 ++++++----
.../descriptors/InMemoryTableDescriptor.java | 38 ++--
.../descriptors/InMemoryTableProvider.java | 71 --------
.../InMemoryTableProviderFactory.java | 33 ----
.../inmemory/TestInMemoryTableDescriptor.java | 65 +++++++
.../TestInMemoryTableDescriptor.java | 55 ------
.../descriptors/TestInMemoryTableProvider.java | 67 --------
.../kv/descriptors/RocksDbTableDescriptor.java | 64 +++----
.../kv/descriptors/RocksDbTableProvider.java | 75 --------
.../RocksDbTableProviderFactory.java | 31 ----
.../descriptors/TestRocksDbTableDescriptor.java | 97 ++++++-----
.../descriptors/TestRocksDbTableProvider.java | 68 --------
.../samza/storage/kv/LocalReadWriteTable.java | 3 -
.../samza/storage/kv/LocalReadableTable.java | 3 -
.../samza/storage/kv/LocalTableProvider.java | 103 +----------
.../storage/kv/LocalTableProviderFactory.java | 31 ++++
.../kv/descriptors/TestLocalTableProvider.java | 103 +----------
.../sql/translator/TranslatorTestBase.java | 11 +-
.../samza/example/TaskApplicationExample.java | 5 +-
73 files changed, 1185 insertions(+), 1789 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
new file mode 100644
index 0000000..c4381a1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class for handling table configuration
+ */
+public class JavaTableConfig extends MapConfig {
+
+ // Prefix
+ public static final String TABLES_PREFIX = "tables.";
+ public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
+
+ // Suffix
+ public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory";
+
+ // Config keys
+ public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX);
+ public static final String STORE_KEY_SERDE = "stores.%s.key.serde";
+ public static final String STORE_MSG_SERDE = "stores.%s.msg.serde";
+
+ public JavaTableConfig(Map<String, String> config) {
+ super(config);
+ }
+
+ /**
+ * Get Id's of all tables
+ * @return list of table Id's
+ */
+ public List<String> getTableIds() {
+ Config subConfig = subset(TABLES_PREFIX, true);
+ Set<String> tableNames = subConfig.keySet().stream()
+ .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
+ .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
+ .collect(Collectors.toSet());
+ return new LinkedList<>(tableNames);
+ }
+
+ /**
+ * Get the {@link org.apache.samza.table.TableProviderFactory} class name for a table
+ * @param tableId Id of the table
+ * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+ */
+ public String getTableProviderFactory(String tableId) {
+ return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
+ }
+
+ /**
+ * Get registry keys of key serde for this table
+ * @param tableId Id of the table
+ * @return serde retistry key
+ */
+ public String getKeySerde(String tableId) {
+ return get(String.format(STORE_KEY_SERDE, tableId), null);
+ }
+
+ /**
+ * Get registry keys of value serde for this table
+ * @param tableId Id of the table
+ * @return serde retistry key
+ */
+ public String getValueSerde(String tableId) {
+ return get(String.format(STORE_MSG_SERDE, tableId), null);
+ }
+
+ /**
+ * Get table config value for a key
+ * @param tableId Id of the table
+ * @param key Key for the config item
+ * @param defaultValue default value if absent in config
+ * @return config value for the key
+ */
+ public String getForTable(String tableId, String key, String defaultValue) {
+ return get(buildKey(tableId, key), defaultValue);
+ }
+
+ /**
+ * Get table config value for a key
+ * @param tableId Id of the table
+ * @param key Key for the config item
+ * @return config value for the key
+ */
+ public String getForTable(String tableId, String key) {
+ return getForTable(tableId, key, null);
+ }
+
+ /**
+ * Build complete config key for a config item
+ * @param tableId Id of the table
+ * @param key Key for the config item
+ * @return the complete config key
+ */
+ static public String buildKey(String tableId, String key) {
+ return String.format(TABLE_ID_PREFIX + ".%s", tableId, key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 350324c..2dec989 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -18,9 +18,7 @@
*/
package org.apache.samza.table;
-import java.util.Map;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
/**
@@ -42,19 +40,6 @@ public interface TableProvider {
Table getTable();
/**
- * Generate any configuration for this table, the generated configuration
- * is used by Samza container to construct this table and any components
- * necessary. Instead of manipulating the input parameters, this method
- * should return the generated configuration.
- *
- * @param jobConfig the job config
- * @param generatedConfig config generated by earlier processing, but has
- * not yet been merged to job config
- * @return configuration for this table
- */
- Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
- /**
* Shutdown the underlying table
*/
void close();
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
index 1bb0196..86214a5 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -19,6 +19,7 @@
package org.apache.samza.table;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
/**
@@ -27,9 +28,10 @@ import org.apache.samza.annotation.InterfaceStability;
@InterfaceStability.Unstable
public interface TableProviderFactory {
/**
- * Constructs an instances of the table provider based on a given table spec
- * @param tableSpec the table spec
- * @return the table provider
+ * Construct a table provider based on job configuration
+ * @param tableId Id of the table
+ * @param config Job configuration
+ * @return the constructed table provider
*/
- TableProvider getTableProvider(TableSpec tableSpec);
+ TableProvider getTableProvider(String tableId, Config config);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
deleted file mode 100644
index 2883a93..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.storage.SideInputsProcessor;
-
-
-/**
- * TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment.
- *
- * It is typically created indirectly by constructing an instance of
- * {@link org.apache.samza.table.descriptors.TableDescriptor}, and then invoke
- * <code>BaseTableDescriptor.getTableSpec()</code>.
- *
- * It has specific attributes for common behaviors that Samza uses.
- *
- * It has the table provider factory, which provides the actual table implementation.
- *
- * It also includes a map of configurations which may be implementation-specific.
- *
- * It is immutable by design.
- */
-@InterfaceStability.Unstable
-public class TableSpec implements Serializable {
-
- private final String id;
- private final String tableProviderFactoryClassName;
-
- /**
- * The following fields are serialized by the ExecutionPlanner when generating the configs for a table, and deserialized
- * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
- */
- private transient final KVSerde serde;
- private transient final List<String> sideInputs;
- private transient final SideInputsProcessor sideInputsProcessor;
- private transient final Map<String, String> config = new HashMap<>();
-
- /**
- * Default constructor
- */
- public TableSpec() {
- this.id = null;
- this.serde = null;
- this.tableProviderFactoryClassName = null;
- this.sideInputs = null;
- this.sideInputsProcessor = null;
- }
-
- /**
- * Constructs a {@link TableSpec}
- *
- * @param tableId Id of the table
- * @param tableProviderFactoryClassName table provider factory
- * @param serde the serde
- * @param config implementation specific configuration
- */
- public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config) {
- this(tableId, serde, tableProviderFactoryClassName, config, Collections.emptyList(), null);
- }
-
- /**
- * Constructs a {@link TableSpec}
- *
- * @param tableId Id of the table
- * @param tableProviderFactoryClassName table provider factory
- * @param serde the serde
- * @param config implementation specific configuration
- * @param sideInputs list of side inputs for the table
- * @param sideInputsProcessor side input processor for the table
- */
- public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config,
- List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
- this.id = tableId;
- this.serde = serde;
- this.tableProviderFactoryClassName = tableProviderFactoryClassName;
- this.config.putAll(config);
- this.sideInputs = sideInputs;
- this.sideInputsProcessor = sideInputsProcessor;
- }
-
- /**
- * Get the Id of the table
- * @return Id of the table
- */
- public String getId() {
- return id;
- }
-
- /**
- * Get the serde
- * @param <K> the type of the key
- * @param <V> the type of the value
- * @return the key serde
- */
- public <K, V> KVSerde<K, V> getSerde() {
- return serde;
- }
-
- /**
- * Get the class name of the table provider factory
- * @return class name of the table provider factory
- */
- public String getTableProviderFactoryClassName() {
- return tableProviderFactoryClassName;
- }
-
- /**
- * Get implementation configuration for the table
- * @return configuration for the table
- */
- public Map<String, String> getConfig() {
- return Collections.unmodifiableMap(config);
- }
-
- /**
- * Get the list of side inputs for the table.
- *
- * @return a {@link List} of side input streams
- */
- public List<String> getSideInputs() {
- return sideInputs;
- }
-
- /**
- * Get the {@link SideInputsProcessor} associated with the table.
- *
- * @return a {@link SideInputsProcessor}
- */
- public SideInputsProcessor getSideInputsProcessor() {
- return sideInputsProcessor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || !getClass().equals(o.getClass())) {
- return false;
- }
- return id.equals(((TableSpec) o).id);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index 246216b..d660276 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -19,13 +19,13 @@
package org.apache.samza.table.descriptors;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.TableSpec;
-
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
/**
* Base class for all table descriptor implementations.
@@ -34,13 +34,12 @@ import org.apache.samza.table.TableSpec;
* @param <V> the type of the value in this table
* @param <D> the type of the concrete table descriptor
*/
+@InterfaceStability.Unstable
abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
implements TableDescriptor<K, V, D> {
protected final String tableId;
- protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
-
protected final Map<String, String> config = new HashMap<>();
/**
@@ -51,60 +50,49 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
this.tableId = tableId;
}
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
- this.tableId = tableId;
- this.serde = serde;
- }
-
- /**
- * {@inheritDoc}
- */
@Override
public D withConfig(String key, String value) {
config.put(key, value);
return (D) this;
}
- /**
- * {@inheritDoc}
- */
@Override
public String getTableId() {
return tableId;
}
- /**
- * Get the serde assigned to this {@link TableDescriptor}
- *
- * @return {@link KVSerde} used by this table
- */
- public KVSerde<K, V> getSerde() {
- return serde;
+ @Override
+ public Map<String, String> toConfig(Config jobConfig) {
+
+ validate();
+
+ Map<String, String> tableConfig = new HashMap<>(config);
+ tableConfig.put(
+ String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId),
+ getProviderFactoryClassName());
+
+ return Collections.unmodifiableMap(tableConfig);
}
/**
- * Generate config for {@link TableSpec}; this method is used internally.
- * @param tableSpecConfig configuration for the {@link TableSpec}
+ * Return the fully qualified class name of the {@link org.apache.samza.table.TableProviderFactory}
+ * @return class name of the {@link org.apache.samza.table.TableProviderFactory}
*/
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
- tableSpecConfig.putAll(config);
- }
+ abstract public String getProviderFactoryClassName();
/**
* Validate that this table descriptor is constructed properly; this method is used internally.
*/
- protected void validate() {
- }
+ abstract protected void validate();
/**
- * Create a {@link TableSpec} from this table descriptor; this method is used internally.
- *
- * @return the {@link TableSpec}
+ * Helper method to add a config item to table configuration
+ * @param key key of the config item
+ * @param value value of the config item
+ * @param tableConfig table configuration
*/
- abstract public TableSpec getTableSpec();
+ protected void addTableConfig(String key, String value, Map<String, String> tableConfig) {
+ tableConfig.put(JavaTableConfig.buildKey(tableId, key), value);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
index d6248c6..2ca2d44 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/CachingTableDescriptor.java
@@ -21,16 +21,15 @@ package org.apache.samza.table.descriptors;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-
/**
* Table descriptor for a caching table.
* @param <K> type of the key in the cache
@@ -81,6 +80,10 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
this.cache = cache;
}
+ /**
+ * Retrieve user-defined table descriptors contained in this table
+ * @return table descriptors
+ */
@Override
public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
return cache != null
@@ -88,33 +91,6 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
: Arrays.asList(table);
}
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- if (cache != null) {
- tableSpecConfig.put(CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
- } else {
- if (readTtl != null) {
- tableSpecConfig.put(READ_TTL_MS, String.valueOf(readTtl.toMillis()));
- }
- if (writeTtl != null) {
- tableSpecConfig.put(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
- }
- if (cacheSize > 0) {
- tableSpecConfig.put(CACHE_SIZE, String.valueOf(cacheSize));
- }
- }
-
- tableSpecConfig.put(REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
- tableSpecConfig.put(WRITE_AROUND, String.valueOf(isWriteAround));
-
- return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
- }
-
/**
* Specify the TTL for each read access, ie. record is expired after
* the TTL duration since last read access of each key.
@@ -159,9 +135,37 @@ public class CachingTableDescriptor<K, V> extends HybridTableDescriptor<K, V, Ca
}
@Override
- @VisibleForTesting
- public void validate() {
- super.validate();
+ public String getProviderFactoryClassName() {
+ return PROVIDER_FACTORY_CLASS_NAME;
+ }
+
+ @Override
+ public Map<String, String> toConfig(Config jobConfig) {
+
+ Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+ if (cache != null) {
+ addTableConfig(CACHE_TABLE_ID, cache.getTableId(), tableConfig);
+ } else {
+ if (readTtl != null) {
+ addTableConfig(READ_TTL_MS, String.valueOf(readTtl.toMillis()), tableConfig);
+ }
+ if (writeTtl != null) {
+ addTableConfig(WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()), tableConfig);
+ }
+ if (cacheSize > 0) {
+ addTableConfig(CACHE_SIZE, String.valueOf(cacheSize), tableConfig);
+ }
+ }
+
+ addTableConfig(REAL_TABLE_ID, table.getTableId(), tableConfig);
+ addTableConfig(WRITE_AROUND, String.valueOf(isWriteAround), tableConfig);
+
+ return Collections.unmodifiableMap(tableConfig);
+ }
+
+ @Override
+ protected void validate() {
Preconditions.checkNotNull(table, "Actual table is required.");
if (cache == null) {
Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
index 192bd7e..3965e6d 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/GuavaCacheTableDescriptor.java
@@ -19,16 +19,16 @@
package org.apache.samza.table.descriptors;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
import org.apache.samza.table.utils.SerdeUtils;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
-
/**
* Table descriptor for Guava-based caching table.
* @param <K> type of the key in the cache
@@ -43,24 +43,13 @@ public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, G
private Cache<K, V> cache;
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
*/
public GuavaCacheTableDescriptor(String tableId) {
super(tableId);
}
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- tableSpecConfig.put(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
-
- return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
- }
-
/**
* Specify a pre-configured Guava cache instance to be used for caching table.
* @param cache Guava cache instance
@@ -72,8 +61,19 @@ public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, G
}
@Override
+ public String getProviderFactoryClassName() {
+ return PROVIDER_FACTORY_CLASS_NAME;
+ }
+
+ @Override
+ public Map<String, String> toConfig(Config jobConfig) {
+ Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+ addTableConfig(GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache), tableConfig);
+ return Collections.unmodifiableMap(tableConfig);
+ }
+
+ @Override
protected void validate() {
- super.validate();
Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
index ec8cc67..8708b7f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/HybridTableDescriptor.java
@@ -33,7 +33,8 @@ abstract public class HybridTableDescriptor<K, V, D extends HybridTableDescripto
extends BaseTableDescriptor<K, V, D> {
/**
- * {@inheritDoc}
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
*/
public HybridTableDescriptor(String tableId) {
super(tableId);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index dfcaea4..d194091 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -20,12 +20,18 @@ package org.apache.samza.table.descriptors;
import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.storage.SideInputsProcessor;
-
+import org.apache.samza.table.utils.SerdeUtils;
/**
* Table descriptor for store backed tables.
@@ -37,23 +43,19 @@ import org.apache.samza.storage.SideInputsProcessor;
abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
extends BaseTableDescriptor<K, V, D> {
- static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
- static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
- static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+ public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
- protected List<String> sideInputs;
- protected SideInputsProcessor sideInputsProcessor;
- protected boolean enableChangelog;
+ // Serdes for this table
+ protected final KVSerde<K, V> serde;
+
+ // changelog parameters
+ protected boolean enableChangelog; // Disabled by default
protected String changelogStream;
protected Integer changelogReplicationFactor;
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- public LocalTableDescriptor(String tableId) {
- super(tableId);
- }
+ // Side input parameters
+ protected List<String> sideInputs;
+ protected SideInputsProcessor sideInputsProcessor;
/**
* Constructs a table descriptor instance
@@ -61,7 +63,8 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
* @param serde the serde for key and value
*/
public LocalTableDescriptor(String tableId, KVSerde<K, V> serde) {
- super(tableId, serde);
+ super(tableId);
+ this.serde = serde;
}
public D withSideInputs(List<String> sideInputs) {
@@ -129,27 +132,69 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
return (D) this;
}
+ /**
+ * {@inheritDoc}
+ *
+ * Note: Serdes are expected to be generated during configuration generation
+ * of the job node, which is not handled here.
+ */
@Override
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
- super.generateTableSpecConfig(tableSpecConfig);
+ public Map<String, String> toConfig(Config jobConfig) {
+
+ Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+ JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig);
- tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
+ "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
+ String formattedSideInputs = String.join(",", sideInputs);
+ addStoreConfig("side.inputs", formattedSideInputs, tableConfig);
+ addStoreConfig("side.inputs.processor.serialized.instance",
+ SerdeUtils.serialize("Side Inputs Processor", sideInputsProcessor), tableConfig);
+ }
+
+ // Changelog configuration
if (enableChangelog) {
- if (changelogStream != null) {
- tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+ if (StringUtils.isEmpty(changelogStream)) {
+ String jobName = jobConfig.get("job.name");
+ Preconditions.checkNotNull(jobName, "job.name not found in job config");
+ String jobId = jobConfig.get("job.id");
+ Preconditions.checkNotNull(jobId, "job.id not found in job config");
+ changelogStream = String.format("%s-%s-table-%s", jobName, jobId, tableId);
}
+
+ Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
+ "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
+ addStoreConfig("changelog", changelogStream, tableConfig);
+
if (changelogReplicationFactor != null) {
- tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+ addStoreConfig("changelog.replication.factor", changelogReplicationFactor.toString(), tableConfig);
}
}
+
+ return Collections.unmodifiableMap(tableConfig);
+ }
+
+ /**
+ * Get side input stream names
+ * @return side inputs
+ */
+ public List<String> getSideInputs() {
+ return sideInputs;
}
/**
- * Validate that this table descriptor is constructed properly
+ * Get the serde assigned to this {@link TableDescriptor}
+ *
+ * @return {@link KVSerde} used by this table
*/
+ public KVSerde<K, V> getSerde() {
+ return serde;
+ }
+
@Override
protected void validate() {
- super.validate();
if (sideInputs != null || sideInputsProcessor != null) {
Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
String.format("Invalid side input configuration for table: %s. " +
@@ -165,4 +210,18 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
}
}
+ /**
+ * Helper method to add a store level config item to table configuration
+ * @param key key of the config item
+ * @param value value of the config item
+ * @param tableConfig table configuration
+ */
+ protected void addStoreConfig(String key, String value, Map<String, String> tableConfig) {
+ tableConfig.put(String.format("stores.%s.%s", tableId, key), value);
+ }
+
+ private boolean isValidSystemStreamName(String name) {
+ return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 8b34ec9..5bed7d6 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -20,12 +20,12 @@
package org.apache.samza.table.descriptors;
import java.lang.reflect.Constructor;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.config.Config;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
@@ -35,7 +35,6 @@ import org.apache.samza.util.RateLimiter;
import com.google.common.base.Preconditions;
-
/**
* Table descriptor for remote store backed tables
*
@@ -104,69 +103,6 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
}
/**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
- super(tableId, serde);
- }
-
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- // Serialize and store reader/writer functions
- tableSpecConfig.put(READ_FN, SerdeUtils.serialize("read function", readFn));
-
- if (writeFn != null) {
- tableSpecConfig.put(WRITE_FN, SerdeUtils.serialize("write function", writeFn));
- }
-
- if (!tagCreditsMap.isEmpty()) {
- RateLimiter defaultRateLimiter;
- try {
- Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
- Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
- defaultRateLimiter = ctor.newInstance(tagCreditsMap);
- } catch (Exception ex) {
- throw new SamzaException("Failed to create default rate limiter", ex);
- }
- tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter));
- } else if (rateLimiter != null) {
- tableSpecConfig.put(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
- }
-
- // Serialize the readCredit functions
- if (readCreditFn != null) {
- tableSpecConfig.put(READ_CREDIT_FN, SerdeUtils.serialize(
- "read credit function", readCreditFn));
- }
- // Serialize the writeCredit functions
- if (writeCreditFn != null) {
- tableSpecConfig.put(WRITE_CREDIT_FN, SerdeUtils.serialize(
- "write credit function", writeCreditFn));
- }
-
- if (readRetryPolicy != null) {
- tableSpecConfig.put(READ_RETRY_POLICY, SerdeUtils.serialize(
- "read retry policy", readRetryPolicy));
- }
-
- if (writeRetryPolicy != null) {
- tableSpecConfig.put(WRITE_RETRY_POLICY, SerdeUtils.serialize(
- "write retry policy", writeRetryPolicy));
- }
-
- tableSpecConfig.put(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
-
- return new TableSpec(tableId, serde, PROVIDER_FACTORY_CLASS_NAME, tableSpecConfig);
- }
-
- /**
* Use specified TableReadFunction with remote table and a retry policy.
* @param readFn read function instance
* @return this table descriptor instance
@@ -284,8 +220,60 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
}
@Override
+ public String getProviderFactoryClassName() {
+ return PROVIDER_FACTORY_CLASS_NAME;
+ }
+
+ @Override
+ public Map<String, String> toConfig(Config jobConfig) {
+
+ Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
+
+ // Serialize and store reader/writer functions
+ addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
+
+ if (writeFn != null) {
+ addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
+ }
+
+ if (!tagCreditsMap.isEmpty()) {
+ RateLimiter defaultRateLimiter;
+ try {
+ Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
+ Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
+ defaultRateLimiter = ctor.newInstance(tagCreditsMap);
+ } catch (Exception ex) {
+ throw new SamzaException("Failed to create default rate limiter", ex);
+ }
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+ } else if (rateLimiter != null) {
+ addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+ }
+
+ // Serialize the readCredit functions
+ if (readCreditFn != null) {
+ addTableConfig(READ_CREDIT_FN, SerdeUtils.serialize("read credit function", readCreditFn), tableConfig);
+ }
+ // Serialize the writeCredit functions
+ if (writeCreditFn != null) {
+ addTableConfig(WRITE_CREDIT_FN, SerdeUtils.serialize("write credit function", writeCreditFn), tableConfig);
+ }
+
+ if (readRetryPolicy != null) {
+ addTableConfig(READ_RETRY_POLICY, SerdeUtils.serialize("read retry policy", readRetryPolicy), tableConfig);
+ }
+
+ if (writeRetryPolicy != null) {
+ addTableConfig(WRITE_RETRY_POLICY, SerdeUtils.serialize("write retry policy", writeRetryPolicy), tableConfig);
+ }
+
+ addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);
+
+ return Collections.unmodifiableMap(tableConfig);
+ }
+
+ @Override
protected void validate() {
- super.validate();
Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
"Only one of rateLimiter instance or read/write limits can be specified");
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
index 9798091..806d158 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -18,10 +18,13 @@
*/
package org.apache.samza.table.descriptors;
+import java.util.Map;
+
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
import org.apache.samza.context.TaskContext;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.InitableFunction;
@@ -72,4 +75,18 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
*/
D withConfig(String key, String value);
+ /**
+ * Generate configuration for this table descriptor, the generated configuration
+ * should be the complete configuration for this table that can be directly
+ * included in the job configuration.
+ *
+ * Note: although the serdes may have already been set in this instance, their
+ * corresponding configuration needs to be generated centrally for consistency
+ * and efficiency reasons. Therefore the serde configuration for this table
+ * is expected to have already been generated and stored in the {@code jobConfig}.
+ *
+ * @param jobConfig job configuration
+ * @return table configuration
+ */
+ Map<String, String> toConfig(Config jobConfig);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java b/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
new file mode 100644
index 0000000..db46ffd
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/table/TestBaseTableDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestBaseTableDescriptor {
+
+ private static final String TABLE_ID = "t1";
+
+ @Test
+ public void testMinimal() {
+ Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+ .toConfig(new MapConfig());
+ Assert.assertEquals(1, tableConfig.size());
+ }
+
+ @Test
+ public void testProviderFactoryConfig() {
+ Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+ .toConfig(new MapConfig());
+ Assert.assertEquals(1, tableConfig.size());
+ assertEquals("my.factory", "provider.factory", TABLE_ID, tableConfig);
+ }
+
+ @Test
+ public void testCustomConfig() {
+ Map<String, String> tableConfig = createTableDescriptor(TABLE_ID)
+ .withConfig("abc", "xyz")
+ .toConfig(new MapConfig());
+ Assert.assertEquals(2, tableConfig.size());
+ Assert.assertEquals("xyz", tableConfig.get("abc"));
+ }
+
+ private BaseTableDescriptor createTableDescriptor(String tableId) {
+ return new BaseTableDescriptor(tableId) {
+ @Override
+ public String getProviderFactoryClassName() {
+ return "my.factory";
+ }
+ @Override
+ protected void validate() {
+ }
+ };
+ }
+
+ private void assertEquals(String expectedValue, String key, String tableId, Map<String, String> config) {
+ String realKey = JavaTableConfig.buildKey(tableId, key);
+ Assert.assertEquals(expectedValue, config.get(realKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index e57c957..2632f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -34,12 +34,12 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.TableImpl;
import org.apache.samza.system.descriptors.SystemDescriptor;
@@ -119,9 +119,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
@Override
public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
addTableDescriptor(tableDescriptor);
- BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
- getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
- return new TableImpl(baseTableDescriptor.getTableSpec());
+ if (tableDescriptor instanceof LocalTableDescriptor) {
+ LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+ getOrCreateTableSerdes(localTableDescriptor.getTableId(), localTableDescriptor.getSerde());
+ }
+ return new TableImpl(tableDescriptor);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
index b29cec1..b6ef1ef 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -22,7 +22,7 @@ import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.task.TaskFactory;
@@ -67,8 +67,10 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
@Override
public TaskApplicationDescriptor withTable(TableDescriptor tableDescriptor) {
addTableDescriptor(tableDescriptor);
- BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
- getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
+ if (tableDescriptor instanceof LocalTableDescriptor) {
+ LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+ getOrCreateTableSerdes(localTableDescriptor.getTableId(), localTableDescriptor.getSerde());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
deleted file mode 100644
index 3a3005a..0000000
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.config;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-
-/**
- * A helper class for handling table configuration
- */
-public class JavaTableConfig extends MapConfig {
-
- // Prefix
- public static final String TABLES_PREFIX = "tables.";
- public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
-
- // Suffix
- public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory";
-
- // Config keys
- public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX);
- public static final String TABLE_KEY_SERDE = String.format("%s.key.serde", TABLE_ID_PREFIX);
- public static final String TABLE_VALUE_SERDE = String.format("%s.value.serde", TABLE_ID_PREFIX);
-
-
- public JavaTableConfig(Config config) {
- super(config);
- }
-
- /**
- * Get Id's of all tables
- * @return list of table Id's
- */
- public List<String> getTableIds() {
- Config subConfig = subset(TABLES_PREFIX, true);
- Set<String> tableNames = subConfig.keySet().stream()
- .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
- .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
- .collect(Collectors.toSet());
- return new LinkedList<>(tableNames);
- }
-
- /**
- * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
- * @param tableId Id of the table
- * @return the {@link org.apache.samza.table.TableProviderFactory} class name
- */
- public String getTableProviderFactory(String tableId) {
- return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
- }
-
- /**
- * Get registry keys of key serde for this table
- * @param tableId Id of the table
- * @return serde retistry key
- */
- public String getKeySerde(String tableId) {
- return get(String.format(TABLE_KEY_SERDE, tableId), null);
- }
-
- /**
- * Get registry keys of value serde for this table
- * @param tableId Id of the table
- * @return serde retistry key
- */
- public String getValueSerde(String tableId) {
- return get(String.format(TABLE_VALUE_SERDE, tableId), null);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 10a4215..0110551 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ListUtils;
import org.apache.samza.SamzaException;
@@ -43,12 +44,12 @@ import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,8 +123,7 @@ public class ExecutionPlanner {
Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
- Set<TableSpec> tables = appDesc.getTableDescriptors().stream()
- .map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet());
+ Set<TableDescriptor> tables = appDesc.getTableDescriptors();
// For this phase, we have a single job node for the whole DAG
String jobName = config.get(JobConfig.JOB_NAME());
@@ -140,12 +140,15 @@ public class ExecutionPlanner {
intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
// Add tables
- for (TableSpec table : tables) {
+ for (TableDescriptor table : tables) {
jobGraph.addTable(table, node);
// Add side-input streams (if any)
- Iterable<String> sideInputs = ListUtils.emptyIfNull(table.getSideInputs());
- for (String sideInput : sideInputs) {
- jobGraph.addSideInputStream(getStreamSpec(sideInput, streamConfig));
+ if (table instanceof LocalTableDescriptor) {
+ LocalTableDescriptor localTable = (LocalTableDescriptor) table;
+ Iterable<String> sideInputs = ListUtils.emptyIfNull(localTable.getSideInputs());
+ for (String sideInput : sideInputs) {
+ jobGraph.addSideInputStream(getStreamSpec(sideInput, streamConfig));
+ }
}
}
@@ -208,6 +211,9 @@ public class ExecutionPlanner {
OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(
jobGraph.getApplicationDescriptorImpl().getInputOperators().values());
+ Map<String, TableDescriptor> tableDescriptors = jobGraph.getTables().stream()
+ .collect(Collectors.toMap(TableDescriptor::getTableId, Function.identity()));
+
// Convert every group of input operator specs into a group of corresponding stream edges.
List<StreamSet> streamSets = new ArrayList<>();
for (OperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
@@ -218,11 +224,14 @@ public class ExecutionPlanner {
// streams associated with the joined table (if any).
if (joinOpSpec instanceof StreamTableJoinOperatorSpec) {
StreamTableJoinOperatorSpec streamTableJoinOperatorSpec = (StreamTableJoinOperatorSpec) joinOpSpec;
-
- Collection<String> sideInputs = ListUtils.emptyIfNull(streamTableJoinOperatorSpec.getTableSpec().getSideInputs());
- Iterable<StreamEdge> sideInputStreams = sideInputs.stream().map(jobGraph::getStreamEdge)::iterator;
- Iterable<StreamEdge> streams = streamSet.getStreamEdges();
- streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streams, sideInputStreams));
+ TableDescriptor tableDescriptor = tableDescriptors.get(streamTableJoinOperatorSpec.getTableId());
+ if (tableDescriptor instanceof LocalTableDescriptor) {
+ LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+ Collection<String> sideInputs = ListUtils.emptyIfNull(localTableDescriptor.getSideInputs());
+ Iterable<StreamEdge> sideInputStreams = sideInputs.stream().map(jobGraph::getStreamEdge)::iterator;
+ Iterable<StreamEdge> streams = streamSet.getStreamEdges();
+ streamSet = new StreamSet(streamSet.getSetId(), Iterables.concat(streams, sideInputStreams));
+ }
}
streamSets.add(streamSet);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 7944dd3..0da1fd5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -36,7 +36,7 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
private final Set<StreamEdge> outputStreams = new HashSet<>();
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
private final Set<StreamEdge> sideInputStreams = new HashSet<>();
- private final Set<TableSpec> tables = new HashSet<>();
+ private final Set<TableDescriptor> tables = new HashSet<>();
private final Config config;
private final JobGraphJsonGenerator jsonGenerator;
private final JobNodeConfigurationGenerator configGenerator;
@@ -150,9 +150,9 @@ import org.slf4j.LoggerFactory;
intermediateStreams.add(edge);
}
- void addTable(TableSpec tableSpec, JobNode node) {
- tables.add(tableSpec);
- node.addTable(tableSpec);
+ void addTable(TableDescriptor tableDescriptor, JobNode node) {
+ tables.add(tableDescriptor);
+ node.addTable(tableDescriptor);
}
/**
@@ -237,9 +237,9 @@ import org.slf4j.LoggerFactory;
/**
* Returns the tables in the graph
- * @return unmodifiable set of {@link TableSpec}
+ * @return unmodifiable set of {@link TableDescriptor}
*/
- Set<TableSpec> getTables() {
+ Set<TableDescriptor> getTables() {
return Collections.unmodifiableSet(tables);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 4a2a235..4b11174 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -36,7 +36,8 @@ import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -59,10 +60,8 @@ import org.codehaus.jackson.map.ObjectMapper;
static final class TableSpecJson {
@JsonProperty("id")
String id;
- @JsonProperty("tableProviderFactory")
- String tableProviderFactory;
- @JsonProperty("config")
- Map<String, String> config;
+ @JsonProperty("providerFactory")
+ String providerFactory;
}
static final class StreamEdgeJson {
@@ -183,13 +182,13 @@ import org.codehaus.jackson.map.ObjectMapper;
}
if (spec instanceof StreamTableJoinOperatorSpec) {
- TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec();
- map.put("tableId", tableSpec.getId());
+ String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+ map.put("tableId", tableId);
}
if (spec instanceof StreamTableJoinOperatorSpec) {
- TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec();
- map.put("tableId", tableSpec.getId());
+ String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+ map.put("tableId", tableId);
}
if (spec instanceof JoinOperatorSpec) {
@@ -270,27 +269,15 @@ import org.codehaus.jackson.map.ObjectMapper;
return edgeJson;
}
- /**
- * Get or create the JSON POJO for a {@link TableSpec}
- * @param tableSpec the {@link TableSpec}
- * @param tableSpecs a map of tableId to {@link TableSpecJson}
- * @return JSON representation of the {@link TableSpec}
- */
- private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) {
- String tableId = tableSpec.getId();
- return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableSpec));
+ private TableSpecJson buildTableJson(TableDescriptor tableDescriptor, Map<String, TableSpecJson> tableSpecs) {
+ String tableId = tableDescriptor.getTableId();
+ return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableDescriptor));
}
- /**
- * Create the JSON POJO for a {@link TableSpec}
- * @param tableSpec the {@link TableSpec}
- * @return JSON representation of the {@link TableSpec}
- */
- private TableSpecJson buildTableJson(TableSpec tableSpec) {
+ private TableSpecJson buildTableJson(TableDescriptor tableDescriptor) {
TableSpecJson tableSpecJson = new TableSpecJson();
- tableSpecJson.id = tableSpec.getId();
- tableSpecJson.tableProviderFactory = tableSpec.getTableProviderFactoryClassName();
- tableSpecJson.config = tableSpec.getConfig();
+ tableSpecJson.id = tableDescriptor.getTableId();
+ tableSpecJson.providerFactory = ((BaseTableDescriptor) tableDescriptor).getProviderFactoryClassName();
return tableSpecJson;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 82b4178..e4fbdba 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -35,7 +35,7 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ public class JobNode {
private final Map<String, StreamEdge> inEdges = new HashMap<>();
private final Map<String, StreamEdge> outEdges = new HashMap<>();
// Similarly, tables uses tableId as the key
- private final Map<String, TableSpec> tables = new HashMap<>();
+ private final Map<String, TableDescriptor> tables = new HashMap<>();
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
JobNode(String jobName, String jobId, Config config, ApplicationDescriptorImpl appDesc,
@@ -97,8 +97,8 @@ public class JobNode {
outEdges.put(out.getStreamSpec().getId(), out);
}
- void addTable(TableSpec tableSpec) {
- tables.put(tableSpec.getId(), tableSpec);
+ void addTable(TableDescriptor tableDescriptor) {
+ tables.put(tableDescriptor.getTableId(), tableDescriptor);
}
Map<String, StreamEdge> getInEdges() {
@@ -109,7 +109,7 @@ public class JobNode {
return outEdges;
}
- Map<String, TableSpec> getTables() {
+ Map<String, TableDescriptor> getTables() {
return tables;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 215177b..2fed979 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -50,7 +50,8 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.util.MathUtil;
import org.apache.samza.util.StreamUtil;
import org.apache.samza.util.Util;
@@ -93,7 +94,7 @@ import org.slf4j.LoggerFactory;
Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
- Map<String, TableSpec> reachableTables = getReachableTables(reachableOperators, jobNode);
+ Map<String, TableDescriptor> reachableTables = getReachableTables(reachableOperators, jobNode);
// config passed by the JobPlanner. user-provided + system-stream descriptor config + misc. other config
Config originalConfig = jobNode.getConfig();
@@ -141,7 +142,7 @@ import org.slf4j.LoggerFactory;
return new JobConfig(mergeConfig(originalConfig, generatedConfig));
}
- private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
+ private Map<String, TableDescriptor> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
// TODO: Fix this in SAMZA-1893. For now, returning all tables for single-job execution plan
return jobNode.getTables();
}
@@ -207,22 +208,25 @@ import org.slf4j.LoggerFactory;
}
private void configureTables(Map<String, String> generatedConfig, Config originalConfig,
- Map<String, TableSpec> tables, Set<String> inputs) {
+ Map<String, TableDescriptor> tables, Set<String> inputs) {
generatedConfig.putAll(
- TableConfigGenerator.generateConfigsForTableSpecs(
+ TableConfigGenerator.generate(
new MapConfig(generatedConfig), new ArrayList<>(tables.values())));
// Add side inputs to the inputs and mark the stream as bootstrap
- tables.values().forEach(tableSpec -> {
- List<String> sideInputs = tableSpec.getSideInputs();
- if (sideInputs != null && !sideInputs.isEmpty()) {
- sideInputs.stream()
- .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
- .forEach(systemStream -> {
- inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
- generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
- systemStream.getSystem(), systemStream.getStream()), "true");
- });
+ tables.values().forEach(tableDescriptor -> {
+ if (tableDescriptor instanceof LocalTableDescriptor) {
+ LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
+ List<String> sideInputs = localTableDescriptor.getSideInputs();
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ sideInputs.stream()
+ .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
+ .forEach(systemStream -> {
+ inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
+ generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+ systemStream.getSystem(), systemStream.getStream()), "true");
+ });
+ }
}
});
}
@@ -306,12 +310,12 @@ import org.slf4j.LoggerFactory;
// set key and msg serdes for stores to the serde names generated above
tableKeySerdes.forEach((tableId, serde) -> {
- String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+ String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
tableMsgSerdes.forEach((tableId, serde) -> {
- String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+ String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
index 123244b..70a5848 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -33,7 +33,6 @@ import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.TableSpec;
/**
@@ -56,7 +55,7 @@ import org.apache.samza.table.TableSpec;
Multimap<OperatorSpec, InputOperatorSpec> joinToInputOpSpecs = HashMultimap.create();
// Create a getNextOpSpecs() function that emulates connections between every SendToTableOperatorSpec
- // — which are terminal OperatorSpecs — and all StreamTableJoinOperatorSpecs referencing the same TableSpec.
+ // — which are terminal OperatorSpecs — and all StreamTableJoinOperatorSpecs referencing the same table.
//
// This is necessary to support Stream-Table Join scenarios because it allows us to associate streams behind
// SendToTableOperatorSpecs with streams participating in Stream-Table Joins, a connection that would not be
@@ -96,14 +95,14 @@ import org.apache.samza.table.TableSpec;
* {@code operatorSpecGraph}.
*
* Calling the returned function with any {@link SendToTableOperatorSpec} will return a collection of all
- * {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec} as the specified
+ * {@link StreamTableJoinOperatorSpec}s that reference the same table as the specified
* {@link SendToTableOperatorSpec}, as if they were actually connected.
*/
private static Function<OperatorSpec, Iterable<OperatorSpec>> getCustomGetNextOpSpecs(
Iterable<InputOperatorSpec> inputOpSpecs) {
// Traverse operatorSpecGraph to create mapping between every SendToTableOperatorSpec and all
- // StreamTableJoinOperatorSpecs referencing the same TableSpec.
+ // StreamTableJoinOperatorSpecs referencing the same table.
TableJoinVisitor tableJoinVisitor = new TableJoinVisitor();
for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
traverse(inputOpSpec, tableJoinVisitor, opSpec -> opSpec.getRegisteredOperatorSpecs());
@@ -113,7 +112,7 @@ import org.apache.samza.table.TableSpec;
tableJoinVisitor.getSendToTableOpSpecToStreamTableJoinOpSpecs();
return operatorSpec -> {
- // If this is a SendToTableOperatorSpec, return all StreamTableJoinSpecs referencing the same TableSpec.
+ // If this is a SendToTableOperatorSpec, return all StreamTableJoinSpecs referencing the same table.
// For all other types of operator specs, return the next registered operator specs.
if (operatorSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) operatorSpec;
@@ -145,21 +144,21 @@ import org.apache.samza.table.TableSpec;
/**
* An {@link OperatorSpec} visitor that records associations between every {@link SendToTableOperatorSpec}
- * and all {@link StreamTableJoinOperatorSpec}s that reference the same {@link TableSpec}.
+ * and all {@link StreamTableJoinOperatorSpec}s that reference the same table.
*/
private static class TableJoinVisitor implements Consumer<OperatorSpec> {
- private final Multimap<TableSpec, SendToTableOperatorSpec> tableSpecToSendToTableOpSpecs = HashMultimap.create();
- private final Multimap<TableSpec, StreamTableJoinOperatorSpec> tableSpecToStreamTableJoinOpSpecs = HashMultimap.create();
+ private final Multimap<String, SendToTableOperatorSpec> tableToSendToTableOpSpecs = HashMultimap.create();
+ private final Multimap<String, StreamTableJoinOperatorSpec> tableToStreamTableJoinOpSpecs = HashMultimap.create();
@Override
public void accept(OperatorSpec opSpec) {
- // Record all SendToTableOperatorSpecs, StreamTableJoinOperatorSpecs, and their corresponding TableSpecs.
+ // Record all SendToTableOperatorSpecs, StreamTableJoinOperatorSpecs, and their corresponding tables.
if (opSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) opSpec;
- tableSpecToSendToTableOpSpecs.put(sendToTableOperatorSpec.getTableSpec(), sendToTableOperatorSpec);
+ tableToSendToTableOpSpecs.put(sendToTableOperatorSpec.getTableId(), sendToTableOperatorSpec);
} else if (opSpec instanceof StreamTableJoinOperatorSpec) {
StreamTableJoinOperatorSpec streamTableJoinOpSpec = (StreamTableJoinOperatorSpec) opSpec;
- tableSpecToStreamTableJoinOpSpecs.put(streamTableJoinOpSpec.getTableSpec(), streamTableJoinOpSpec);
+ tableToStreamTableJoinOpSpecs.put(streamTableJoinOpSpec.getTableId(), streamTableJoinOpSpec);
}
}
@@ -167,11 +166,11 @@ import org.apache.samza.table.TableSpec;
Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableOpSpecToStreamTableJoinOpSpecs =
HashMultimap.create();
- // Map every SendToTableOperatorSpec to all StreamTableJoinOperatorSpecs referencing the same TableSpec.
- for (TableSpec tableSpec : tableSpecToSendToTableOpSpecs.keySet()) {
- Collection<SendToTableOperatorSpec> sendToTableOpSpecs = tableSpecToSendToTableOpSpecs.get(tableSpec);
+ // Map every SendToTableOperatorSpec to all StreamTableJoinOperatorSpecs referencing the same table.
+ for (String tableId : tableToSendToTableOpSpecs.keySet()) {
+ Collection<SendToTableOperatorSpec> sendToTableOpSpecs = tableToSendToTableOpSpecs.get(tableId);
Collection<StreamTableJoinOperatorSpec> streamTableJoinOpSpecs =
- tableSpecToStreamTableJoinOpSpecs.get(tableSpec);
+ tableToStreamTableJoinOpSpecs.get(tableId);
for (SendToTableOperatorSpec sendToTableOpSpec : sendToTableOpSpecs) {
sendToTableOpSpecToStreamTableJoinOpSpecs.putAll(sendToTableOpSpec, streamTableJoinOpSpecs);
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 0f43c5e..b652d68 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -50,7 +50,6 @@ import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
/**
@@ -147,9 +146,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN);
- TableSpec tableSpec = ((TableImpl) table).getTableSpec();
StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
- tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
+ ((TableImpl) table).getTableId(), (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec);
}
@@ -183,7 +181,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
public <K, V> void sendTo(Table<KV<K, V>> table) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
SendToTableOperatorSpec<K, V> op =
- OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
+ OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
index 8ceada0..654968b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
@@ -20,22 +20,26 @@ package org.apache.samza.operators;
import java.io.Serializable;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.TableDescriptor;
/**
- * This class is the holder of a {@link TableSpec}
+ * This class is the holder of a table Id
*/
public class TableImpl implements Table, Serializable {
- private final TableSpec tableSpec;
+ private final String tableId;
- public TableImpl(TableSpec tableSpec) {
- this.tableSpec = tableSpec;
+ public TableImpl(String tableId) {
+ this.tableId = tableId;
}
- public TableSpec getTableSpec() {
- return tableSpec;
+ public TableImpl(TableDescriptor tableDescriptor) {
+ this.tableId = tableDescriptor.getTableId();
+ }
+
+ public String getTableId() {
+ return tableId;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index be3e0a3..0d39c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
this.sendToTableOpSpec = sendToTableOpSpec;
- this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableSpec().getId());
+ this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index d89f2b3..96f07d1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -46,7 +46,7 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M
StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
this.joinOpSpec = joinOpSpec;
- this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableSpec().getId());
+ this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId());
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5f7a22c3/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 8d3ff60..ff3fe67 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -29,8 +29,6 @@ import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-
/**
* Factory methods for creating {@link OperatorSpec} instances.
@@ -192,7 +190,7 @@ public class OperatorSpecs {
/**
* Creates a {@link StreamTableJoinOperatorSpec} with a join function.
*
- * @param tableSpec the table spec for the table on the right side of the join
+ * @param tableId the table Id for the table on the right side of the join
* @param joinFn the user-defined join function to get join keys and results
* @param opId the unique ID of the operator
* @param <K> the type of join key
@@ -202,23 +200,23 @@ public class OperatorSpecs {
* @return the {@link StreamTableJoinOperatorSpec}
*/
public static <K, M, R, JM> StreamTableJoinOperatorSpec<K, M, R, JM> createStreamTableJoinOperatorSpec(
- TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
- return new StreamTableJoinOperatorSpec(tableSpec, joinFn, opId);
+ String tableId, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+ return new StreamTableJoinOperatorSpec(tableId, joinFn, opId);
}
/**
* Creates a {@link SendToTableOperatorSpec} with a key extractor and a value extractor function,
* the type of incoming message is expected to be KV<K, V>.
*
- * @param tableSpec the table spec for the underlying table
+ * @param tableId the table Id for the underlying table
* @param opId the unique ID of the operator
* @param <K> the type of the table record key
* @param <V> the type of the table record value
* @return the {@link SendToTableOperatorSpec}
*/
public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec(
- TableSpec tableSpec, String opId) {
- return new SendToTableOperatorSpec(tableSpec, opId);
+ String tableId, String opId) {
+ return new SendToTableOperatorSpec(tableId, opId);
}
/**