You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/16 01:55:33 UTC
samza git commit: SAMZA-1954: User provided configuration should have
higher precedence than Samza generated configuration.
Repository: samza
Updated Branches:
refs/heads/master 96658ee7e -> 989f8f6ad
SAMZA-1954: User provided configuration should have higher precedence than Samza generated configuration.
Author: Prateek Maheshwari <pm...@apache.org>
Reviewers: Cameron Lee <ca...@linkedin.com>
Closes #728 from prateekm/config-precedence
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/989f8f6a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/989f8f6a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/989f8f6a
Branch: refs/heads/master
Commit: 989f8f6ad1e983a81cce8178196306a3ac72fb02
Parents: 96658ee
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Mon Oct 15 18:55:28 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 15 18:55:28 2018 -0700
----------------------------------------------------------------------
.../descriptors/GenericInputDescriptor.java | 2 +-
.../descriptors/GenericOutputDescriptor.java | 2 +-
.../descriptors/GenericSystemDescriptor.java | 2 +-
.../system/descriptors/InputDescriptor.java | 2 +-
.../system/descriptors/OutputDescriptor.java | 2 +-
.../system/descriptors/StreamDescriptor.java | 2 +-
.../system/descriptors/SystemDescriptor.java | 2 +-
.../descriptors/EventHubsInputDescriptor.java | 9 +-
.../descriptors/EventHubsOutputDescriptor.java | 7 +-
.../descriptors/EventHubsSystemDescriptor.java | 2 +-
.../JobNodeConfigurationGenerator.java | 129 +++++++------------
.../org/apache/samza/execution/JobPlanner.java | 66 ++++++----
.../apache/samza/execution/LocalJobPlanner.java | 6 +-
.../samza/execution/RemoteJobPlanner.java | 2 +-
.../org/apache/samza/config/JobConfig.scala | 1 -
.../samza/execution/TestExecutionPlanner.java | 3 +-
.../TestJobNodeConfigurationGenerator.java | 52 +-------
.../kafka/descriptors/KafkaInputDescriptor.java | 2 +-
.../descriptors/KafkaOutputDescriptor.java | 2 +-
.../descriptors/KafkaSystemDescriptor.java | 2 +-
.../apache/samza/test/framework/TestRunner.java | 19 +--
.../descriptors/InMemoryInputDescriptor.java | 4 +-
.../descriptors/InMemorySystemDescriptor.java | 18 +--
23 files changed, 123 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
index 6d18afc..aa5c8d2 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.serializers.Serde;
* Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
* Additional system stream specific properties may be provided using {@link #withStreamConfigs}
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
index 5fe3a98..1d81525 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.serializers.Serde;
* Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
* Additional system stream specific properties may be provided using {@link #withStreamConfigs}
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
index 59b2a12..eb86877 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
@@ -28,7 +28,7 @@ import org.apache.samza.serializers.Serde;
* Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
* Additional system specific properties may be provided using {@link #withSystemConfigs}
* <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
*/
public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
index 2c0ca88..fd7a50c 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
@@ -28,7 +28,7 @@ import org.apache.samza.system.SystemStreamMetadata.OffsetType;
/**
* The base descriptor for an input stream. Allows setting properties that are common to all input streams.
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
index 70a5d0f..898be1e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.samza.serializers.Serde;
/**
* The base descriptor for an output stream. Allows setting properties that are common to all output streams.
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
index d2b25f9..e8e586f 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
@@ -31,7 +31,7 @@ import org.apache.samza.serializers.Serde;
/**
* The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptors.
*
* @param <StreamMessageType> type of messages in this stream.
* @param <SubClass> type of the concrete sub-class
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
index 4b93a32..9db2544 100644
--- a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
/**
* The base descriptor for a system. Allows setting properties that are common to all systems.
* <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
* <p>
* Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
* {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index cce716c..462cc05 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -31,11 +31,10 @@ import org.apache.samza.system.eventhub.EventHubConfig;
/**
* A descriptor for the Event Hubs output stream
- *<p>
- * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
- *</p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
+ * <p>
+ * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ * <p>
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index cd17033..ddbf79c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -31,10 +31,9 @@ import org.apache.samza.system.eventhub.EventHubConfig;
/**
* A descriptor for an Event Hubs output stream
* <p>
- * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
- * </p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
+ * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * <p>
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index 4e292d9..80bdfae 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -33,7 +33,7 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
/**
* A descriptor for a Event Hubs system.
* <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
*/
public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 676d28e..70c9b23 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -66,67 +67,80 @@ import org.slf4j.LoggerFactory;
static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
- static JobConfig mergeJobConfig(Config originalConfig, Config generatedConfig) {
- JobConfig jobConfig = new JobConfig(originalConfig);
- String jobNameAndId = JobNode.createJobNameAndId(jobConfig.getName().get(), jobConfig.getJobId());
- return new JobConfig(Util.rewriteConfig(extractScopedConfig(originalConfig, generatedConfig,
- String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), jobNameAndId))));
+ static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
+ Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
+ originalConfig.forEach((k, v) -> {
+ if (generatedConfig.containsKey(k) &&
+ !Objects.equals(generatedConfig.get(k), v)) {
+ LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
+ k, generatedConfig.get(k), v);
+ }
+ mergedConfig.put(k, v);
+ });
+
+ return Util.rewriteConfig(new MapConfig(mergedConfig));
}
JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
- Map<String, String> configs = new HashMap<>();
+ if (jobNode.isLegacyTaskApplication()) {
+ return new JobConfig(jobNode.getConfig());
+ }
+
+ Map<String, String> generatedConfig = new HashMap<>();
// set up job name and job ID
- configs.put(JobConfig.JOB_NAME(), jobNode.getJobName());
- configs.put(JobConfig.JOB_ID(), jobNode.getJobId());
+ generatedConfig.put(JobConfig.JOB_NAME(), jobNode.getJobName());
+ generatedConfig.put(JobConfig.JOB_ID(), jobNode.getJobId());
Map<String, StreamEdge> inEdges = jobNode.getInEdges();
Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
Map<String, TableSpec> reachableTables = getReachableTables(reachableOperators, jobNode);
- Config config = jobNode.getConfig();
+
+ // config passed by the JobPlanner. user-provided + system-stream descriptor config + misc. other config
+ Config originalConfig = jobNode.getConfig();
// check all inputs to the node for broadcast and input streams
final Set<String> inputs = new HashSet<>();
- final Set<String> broadcasts = new HashSet<>();
+ final Set<String> broadcastInputs = new HashSet<>();
for (StreamEdge inEdge : inEdges.values()) {
String formattedSystemStream = inEdge.getName();
if (inEdge.isBroadcast()) {
- broadcasts.add(formattedSystemStream + "#0");
+ broadcastInputs.add(formattedSystemStream + "#0");
} else {
inputs.add(formattedSystemStream);
}
}
- configureBroadcastInputs(configs, config, broadcasts);
+ configureBroadcastInputs(generatedConfig, originalConfig, broadcastInputs);
// compute window and join operator intervals in this node
- configureWindowInterval(configs, config, reachableOperators);
+ configureWindowInterval(generatedConfig, originalConfig, reachableOperators);
// set store configuration for stateful operators.
- stores.forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+ stores.forEach(sd -> generatedConfig.putAll(sd.getStorageConfigs()));
// set the execution plan in json
- configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
+ generatedConfig.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
// write intermediate input/output streams to configs
- inEdges.values().stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig()));
+ inEdges.values().stream().filter(StreamEdge::isIntermediate)
+ .forEach(intermediateEdge -> generatedConfig.putAll(intermediateEdge.generateConfig()));
// write serialized serde instances and stream, store, and table serdes to configs
// serde configuration generation has to happen before table configuration, since the serde configuration
// is required when generating configurations for some TableProvider (i.e. local store backed tables)
- configureSerdes(configs, inEdges, outEdges, stores, reachableTables.keySet(), jobNode);
+ configureSerdes(generatedConfig, inEdges, outEdges, stores, reachableTables.keySet(), jobNode);
// generate table configuration and potential side input configuration
- configureTables(configs, config, reachableTables, inputs);
+ configureTables(generatedConfig, originalConfig, reachableTables, inputs);
- // finalize the task.inputs configuration
- configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+ // generate the task.inputs configuration
+ generatedConfig.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
- LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), configs);
+ LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), generatedConfig);
- // apply configure rewriters and user configure overrides
- return applyConfigureRewritersAndOverrides(configs, config, jobNode);
+ return new JobConfig(mergeConfig(originalConfig, generatedConfig));
}
private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
@@ -140,9 +154,9 @@ import org.slf4j.LoggerFactory;
if (broadcastStreams.isEmpty()) {
return;
}
- final String taskBroadcasts = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
- if (StringUtils.isNoneEmpty(taskBroadcasts)) {
- broadcastStreams.add(taskBroadcasts);
+ String broadcastInputs = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
+ if (StringUtils.isNotBlank(broadcastInputs)) {
+ broadcastStreams.add(broadcastInputs);
}
configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcastStreams));
}
@@ -155,12 +169,10 @@ import org.slf4j.LoggerFactory;
}
// set triggering interval if a window or join is defined. Only applies to high-level applications
- if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
- long triggerInterval = computeTriggerInterval(reachableOperators);
- LOG.info("Using triggering interval: {}", triggerInterval);
+ long triggerInterval = computeTriggerInterval(reachableOperators);
+ LOG.info("Using triggering interval: {}", triggerInterval);
- configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
- }
+ configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
}
/**
@@ -190,68 +202,27 @@ import org.slf4j.LoggerFactory;
return MathUtil.gcd(candidateTimerIntervals);
}
- private JobConfig applyConfigureRewritersAndOverrides(Map<String, String> configs, Config config, JobNode jobNode) {
- // Disallow user specified job inputs/outputs. This info comes strictly from the user application.
- Map<String, String> allowedConfigs = new HashMap<>(config);
- if (!jobNode.isLegacyTaskApplication()) {
- if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
- LOG.warn("Specifying task inputs in configuration is not allowed for SamzaApplication. "
- + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
- allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
- }
- }
-
- LOG.debug("Job {} has allowed configs {}", jobNode.getJobNameAndId(), allowedConfigs);
- return mergeJobConfig(new MapConfig(allowedConfigs), new MapConfig(configs));
- }
-
- /**
- * This function extract the subset of configs from the full config, and use it to override the generated configs
- * from the job.
- * @param fullConfig full config
- * @param generatedConfig config generated for the job
- * @param configPrefix prefix to extract the subset of the config overrides
- * @return config that merges the generated configs and overrides
- */
- private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
- Config scopedConfig = fullConfig.subset(configPrefix);
-
- Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
- // Strip empty configs so they don't override the configs before them.
- Map<String, String> mergedConfig = new HashMap<>();
- for (Map<String, String> config : configPrecedence) {
- for (Map.Entry<String, String> property : config.entrySet()) {
- String value = property.getValue();
- if (!(value == null || value.isEmpty())) {
- mergedConfig.put(property.getKey(), property.getValue());
- }
- }
- }
- scopedConfig = new MapConfig(mergedConfig);
- LOG.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
-
- return scopedConfig;
- }
-
private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> reachableOperators) {
return reachableOperators.stream().filter(operatorSpec -> operatorSpec instanceof StatefulOperatorSpec)
.map(operatorSpec -> ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors()).flatMap(Collection::stream)
.collect(Collectors.toList());
}
- private void configureTables(Map<String, String> configs, Config config, Map<String, TableSpec> tables, Set<String> inputs) {
- configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(configs),
- tables.values().stream().collect(Collectors.toList())));
+ private void configureTables(Map<String, String> generatedConfig, Config originalConfig,
+ Map<String, TableSpec> tables, Set<String> inputs) {
+ generatedConfig.putAll(
+ TableConfigGenerator.generateConfigsForTableSpecs(
+ new MapConfig(generatedConfig), new ArrayList<>(tables.values())));
// Add side inputs to the inputs and mark the stream as bootstrap
tables.values().forEach(tableSpec -> {
List<String> sideInputs = tableSpec.getSideInputs();
if (sideInputs != null && !sideInputs.isEmpty()) {
sideInputs.stream()
- .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput))
+ .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
.forEach(systemStream -> {
inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
- configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+ generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
systemStream.getSystem(), systemStream.getStream()), "true");
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index dc0fc59..7ea7367 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -32,9 +32,9 @@ import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +49,11 @@ public abstract class JobPlanner {
private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class);
protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
- protected final Config config;
+ protected final Config userConfig;
JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
this.appDesc = descriptor;
- this.config = descriptor.getConfig();
+ this.userConfig = descriptor.getConfig();
}
public abstract List<JobConfig> prepareJobs();
@@ -70,35 +70,22 @@ public abstract class JobPlanner {
/* package private */
ExecutionPlan getExecutionPlan(String runId) {
+ Map<String, String> generatedConfig = getGeneratedConfig(runId);
- // update application configs
- Map<String, String> cfg = new HashMap<>();
- if (StringUtils.isNoneEmpty(runId)) {
- cfg.put(ApplicationConfig.APP_RUN_ID, runId);
+ // merge user-provided configuration with generated configuration. generated configuration has lower priority.
+ // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+ Map<String, String> allowedUserConfig = new HashMap<>(userConfig);
+ if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+ LOG.warn("SamzaApplications should not specify task.inputs in configuration. " +
+ "Ignoring configured value of " + userConfig.get(TaskConfig.INPUT_STREAMS()));
+ allowedUserConfig.remove(TaskConfig.INPUT_STREAMS()); // must be set using descriptors or operators
}
- StreamConfig streamConfig = new StreamConfig(config);
- Set<String> inputStreams = new HashSet<>(appDesc.getInputStreamIds());
- inputStreams.removeAll(appDesc.getOutputStreamIds());
- ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded)
- ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM;
- cfg.put(ApplicationConfig.APP_MODE, mode.name());
-
- // merge user-provided configuration with input/output descriptor generated configuration
- // descriptor generated configuration has higher priority
- Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(appDesc);
- cfg.putAll(systemStreamConfigs);
-
- // adding app.class in the configuration, unless it is LegacyTaskApplication
- if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
- cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
- }
+ Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
- // create the physical execution plan and merge with overrides. This works for a single-stage job now
- // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
- Config mergedConfig = JobNodeConfigurationGenerator.mergeJobConfig(config, new MapConfig(cfg));
// creating the StreamManager to get all input/output streams' metadata for planning
StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
+
try {
ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager);
return planner.plan(appDesc);
@@ -128,7 +115,32 @@ public abstract class JobPlanner {
}
}
- private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ private Map<String, String> getGeneratedConfig(String runId) {
+ Map<String, String> generatedConfig = new HashMap<>();
+ if (StringUtils.isNoneEmpty(runId)) {
+ generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
+ }
+
+ StreamConfig streamConfig = new StreamConfig(userConfig);
+ Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
+ inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
+ ApplicationConfig.ApplicationMode mode =
+ inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+ ? ApplicationConfig.ApplicationMode.BATCH
+ : ApplicationConfig.ApplicationMode.STREAM;
+ generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
+
+ Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
+ generatedConfig.putAll(systemStreamConfigs);
+
+ // adding app.class in the configuration, unless it is LegacyTaskApplication
+ if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
+ generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+ }
+ return generatedConfig;
+ }
+
+ private Map<String, String> generateSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
Map<String, String> systemStreamConfigs = new HashMap<>();
appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 6ca5f3d..9e2f745 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -107,10 +107,10 @@ public class LocalJobPlanner extends JobPlanner {
LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
// Move the scope of coordination utils within stream creation to address long idle connection problem.
// Refer SAMZA-1385 for more details
- JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
- String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+ JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
+ String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
CoordinationUtils coordinationUtils =
- jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
+ jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, userConfig);
if (coordinationUtils == null) {
LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
// each application process will try creating the streams, which
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index 13b29df..c51fd85 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -87,7 +87,7 @@ public class RemoteJobPlanner extends JobPlanner {
}
private Config getConfigFromPrevRun() {
- CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(userConfig, new MetricsRegistryMap());
consumer.register();
consumer.start();
consumer.bootstrap();
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index d7b71b5..4f19ade 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -39,7 +39,6 @@ object JobConfig {
*/
val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters
val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
- val CONFIG_OVERRIDE_JOBS_PREFIX = "jobs.%s."
val JOB_NAME = "job.name" // streaming.job_name
val JOB_ID = "job.id" // streaming.job_id
val SAMZA_FWK_PATH = "samza.fwk.path"
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index f49958c..6d017cb 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -672,9 +672,8 @@ public class TestExecutionPlanner {
}
@Test
- public void testTriggerIntervalWithInvalidWindowMs() {
+ public void testTriggerIntervalWithNoWindowMs() {
Map<String, String> map = new HashMap<>(config);
- map.put(TaskConfig.WINDOW_MS(), "-1");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index dda0ee1..404a39b 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -239,59 +239,9 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
}
@Test
- public void testTaskInputsRemovedFromOriginalConfig() {
- Map<String, String> configs = new HashMap<>(mockConfig);
- configs.put(TaskConfig.INPUT_STREAMS(), "not.allowed1,not.allowed2");
- mockConfig = spy(new MapConfig(configs));
-
- mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde), mockConfig);
- configureJobNode(mockStreamAppDesc);
-
- JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
- JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
- Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
- validateJobConfig(expectedConfig, jobConfig);
- }
-
- @Test
- public void testTaskInputsRetainedForLegacyTaskApplication() {
- Map<String, String> originConfig = new HashMap<>(mockConfig);
- originConfig.put(TaskConfig.INPUT_STREAMS(), "must.retain1,must.retain2");
- mockConfig = new MapConfig(originConfig);
- TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig);
- configureJobNode(taskAppDesc);
-
- // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
- JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
- JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "");
- // jobConfig should be exactly the same as original config
- Map<String, String> generatedConfig = new HashMap<>(jobConfig);
- assertEquals(originConfig, generatedConfig);
- }
-
- @Test
- public void testOverrideConfigs() {
- Map<String, String> configs = new HashMap<>(mockConfig);
- String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId());
- String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride;
- configs.put(overrideCfgKey, "customized-system");
- mockConfig = spy(new MapConfig(configs));
- mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
- configureJobNode(mockStreamAppDesc);
-
- JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
- JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
- Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
- validateJobConfig(expectedConfig, jobConfig);
- assertEquals("customized-system", jobConfig.get(streamCfgToOverride));
- }
-
- @Test
- public void testConfigureRewriter() {
+ public void testConfigRewriter() {
Map<String, String> configs = new HashMap<>(mockConfig);
String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId());
- String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride;
- configs.put(overrideCfgKey, "customized-system");
configs.put(String.format(JobConfig.CONFIG_REWRITER_CLASS(), "mock"), MockConfigRewriter.class.getName());
configs.put(JobConfig.CONFIG_REWRITERS(), "mock");
configs.put(String.format("job.config.rewriter.mock.%s", streamCfgToOverride), "rewritten-system");
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
index 1896fd3..fb279ab 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
@@ -32,7 +32,7 @@ import org.apache.samza.serializers.Serde;
* <p>
* An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
index 0ec5ce7..f13352c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.samza.serializers.Serde;
* <p>
* An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
* <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
*
* @param <StreamMessageType> type of messages in this stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 0c6eaeb..6fb8c1c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -36,7 +36,7 @@ import org.apache.samza.system.kafka.KafkaSystemFactory;
/**
* A descriptor for a Kafka system.
* <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
*/
@SuppressWarnings("unchecked")
public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescriptor>
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index ba7128a..531b0ef 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -168,8 +168,7 @@ public class TestRunner {
public TestRunner addConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
- String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
- configs.put(String.format("%s%s", configPrefix, key), value);
+ configs.put(key, value);
return this;
}
@@ -180,8 +179,7 @@ public class TestRunner {
*/
public TestRunner addConfig(Map<String, String> config) {
Preconditions.checkNotNull(config);
- String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
- config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value));
+ configs.putAll(config);
return this;
}
@@ -204,10 +202,6 @@ public class TestRunner {
return this;
}
- private String getJobNameAndId() {
- return String.format("%s-%s", JOB_NAME, configs.getOrDefault(JobConfig.JOB_ID(), "1"));
- }
-
/**
* Adds the provided input stream with mock data to the test application. Default configs and user added configs have
* a higher precedence over system and stream descriptor generated configs.
@@ -348,12 +342,11 @@ public class TestRunner {
/**
* Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages
- * @param partitonData key of the map represents partitionId and value represents
- * messages in the partition
+ * @param partitionData key of the map represents partitionId and value represents messages in the partition
* @param descriptor describes a stream to initialize with the in memory system
*/
private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor,
- Map<Integer, Iterable<StreamMessageType>> partitonData) {
+ Map<Integer, Iterable<StreamMessageType>> partitionData) {
String systemName = descriptor.getSystemName();
String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
@@ -366,13 +359,13 @@ public class TestRunner {
imsd.withInMemoryScope(this.inMemoryScope);
addConfig(descriptor.toConfig());
addConfig(descriptor.getSystemDescriptor().toConfig());
- StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
+ StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size());
SystemFactory factory = new InMemorySystemFactory();
Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
factory.getAdmin(systemName, config).createStream(spec);
SystemProducer producer = factory.getProducer(systemName, config, null);
SystemStream sysStream = new SystemStream(systemName, streamName);
- partitonData.forEach((partitionId, partition) -> {
+ partitionData.forEach((partitionId, partition) -> {
partition.forEach(e -> {
Object key = e instanceof KV ? ((KV) e).getKey() : null;
Object value = e instanceof KV ? ((KV) e).getValue() : e;
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
index 0e49550..c446083 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
@@ -25,8 +25,8 @@ import org.apache.samza.serializers.NoOpSerde;
/**
* A descriptor for an in memory stream of messages that can either have single or multiple partitions.
* <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ *
* @param <StreamMessageType> type of messages in input stream
*/
public class InMemoryInputDescriptor<StreamMessageType>
http://git-wip-us.apache.org/repos/asf/samza/blob/989f8f6a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
index 96a8aca..a74d0ba 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
@@ -31,8 +31,8 @@ import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.config.JavaSystemConfig;
/**
- * A descriptor for InMemorySystem.
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * Descriptor for an InMemorySystem.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
* <p>
* Following system level configs are set by default
* <ol>
@@ -44,20 +44,6 @@ import org.apache.samza.config.JavaSystemConfig;
public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
- /**
- * <p>
- * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
- * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
- * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
- * scope have the highest precedence.
- *
- * For this case, it generates following overridden configs
- * <ol>
- * <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- * <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * </ol>
- *
- **/
private String inMemoryScope;
/**