You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/26 18:00:59 UTC
[4/4] samza git commit: SAMZA-1814: consolidate JobNode and JobGraph
configuration generation for high and low-level API applications
SAMZA-1814: consolidate JobNode and JobGraph configuration generation for high and low-level API applications
High-level changes:
- Move configuration generation to JobNodeConfigurationGenerator
- Move the intermediate partition calculation to IntermediationStreamPartitionPlanner
- Consolidate the code in JobPlanner and ExecutionPlanner for high and low-level API plan/configuration generation
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-ld2.linkedin.biz>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Prateek Maheshwari <pr...@utexas.edu>
Author: prateekm <pr...@utexas.edu>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Cameron Lee <ca...@linkedin.com>
Closes #642 from nickpan47/SAMZA-1814 and squashes the following commits:
214373966 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814. With minor fixes to allow merge correctly.
f8c8108ac [Yi Pan (Data Infrastructure)] SAMZA-1814: Fix merging errors.
c8681a028 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
b66b9fa9d [Yi Pan (Data Infrastructure)] SAMZA-1814: moving serde generation to a single top-level configuration generation, not embedded in table. Address review comments
0db5068dd [Yi Pan (Data Infrastructure)] SAMZA-1814: fix merge issue and consolidated some test classes
2c856c5f5 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation for high and low-level APIs
ffc6f1a70 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation in ExecutionPlanner between high and low-level API applications
c7fde4a03 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
44844635b [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master
8797cdd4f [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master
dae98cebe [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate the configure generation between high and low-level API applications
3a91b9a62 [Yi Pan (Data Infrastructure)] SAMZA-1814: moving some logic to ApplicationDescriptorImpl to simplify the JobGraph/JobNode code
97c00a2e0 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP unit tests fixed for configure generation.
05637e6e6 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP consolidate all JobGraph and JobNode Json and Config generation code to support both high- and low-level applications
9d564642a [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814
16bef1b1b [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP fixing the task application configuration generation in the planner
66af5b706 [Yi Pan (Data Infrastructure)] SAMZA-1789: addressing Cameron's review comments.
ec4bb1dca [Yi Pan (Data Infrastructure)] SAMZA-1789: merge with fix for SAMZA-1836
9c89c63dc [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
91fcd73ae [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
34ffda8ae [Yi Pan (Data Infrastructure)] SAMZA-1789: disabling tests due to SAMZA-1836
02076c850 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed the modifier for the mandatory constructor of ApplicationRunner; Disabled three tests due to wrong configure for test systems
222abf21f [Yi Pan (Data Infrastructure)] SAMZA-1789: added a constructor to StreamProcessor to take a StreamProcessorListenerFactory
7a73992a5 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixing checkstyle and javadoc errors
9997b98bb [Yi Pan (Data Infrastructure)] SAMZA-1789: renamed all ApplicationDescriptor classes with full-spelling of Application
f4b3d43a4 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fxing TaskApplication examples and some checkstyle errors
f2969f8df [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed ApplicationDescriptor to use InputDescriptor and OutputDescriptor; addressed Prateek's comments.
f04404cc2 [Yi Pan (Data Infrastructure)] SAMZA-1789: move createStreams out of the loop in prepareJobs
33753f72d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
12c09af06 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fix a merging error (with SAMZA-1813)
a072118d0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
e7af6932d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
8d4d3ffda [Yi Pan (Data Infrastructure)] Merge with master
055bd91e4 [Yi Pan (Data Infrastructure)] SAMZA-1789: fix unit test with ThreadJobFactory
247dcff4c [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
1621c4d00 [Yi Pan (Data Infrastructure)] SAMZA-1789: a few more fixes to address Cameron's reviews
6e446fe6d [Yi Pan (Data Infrastructure)] SAMZA-1789: address Cameron's review comments.
4382d45db [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
3b2f04d54 [Yi Pan (Data Infrastructure)] SAMZA-1789: moved all impl classes from samza-api to samza-core.
db96da830 [Yi Pan (Data Infrastructure)] SAMZA-1789: WIP - revision to address review feedbacks.
014337170 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
a82708bb0 [Yi Pan (Data Infrastructure)] SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment
c4bb0dce6 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks
f20cdcda6 [Yi Pan (Data Infrastructure)] WIP: adding unit tests. Pending update on StreamProcessorLifecycleListener, LocalContainerRunner, and SamzaContainerListener
973eb5261 [Yi Pan (Data Infrastructure)] WIP: compiles, still working on LocalContainerRunner refactor
fb1bc49e0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-spec-with-app-runtime-Jul-16-18
30a4e5f0a [Yi Pan (Data Infrastructure)] WIP: application runner refactor - proto-type for SEP-13
95577b74c [Yi Pan (Data Infrastructure)] WIP: trying to figure out the two interface classes for spec: a) spec builder in init(); b) spec reader in all other lifecycle methods
42782d815 [Yi Pan (Data Infrastructure)] Merge branch 'prateek-remove-app-runner-stream-spec' into app-spec-with-app-runtime-Jul-16-18
d43e92319 [Yi Pan (Data Infrastructure)] WIP: proto-type with ApplicationRunnable and no ApplicationRunner exposed to user
f1cb8f0eb [Yi Pan (Data Infrastructure)] Merge branch 'master' into single-app-api-May-21-18
7e71dc7e0 [Yi Pan (Data Infrastructure)] Merge with master
856193013 [Prateek Maheshwari] Merge branch 'master' into stream-spec-cleanup
7d7aa5088 [Prateek Maheshwari] Updated with Cameron and Daniel's feedback.
8e6fc2dac [prateekm] Remove all usages of StreamSpec and ApplicationRunner from the operator spec and impl layers.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cfbb9c6e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cfbb9c6e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cfbb9c6e
Branch: refs/heads/master
Commit: cfbb9c6ebabfe1e5e13af50a0487bde3f5c1e925
Parents: b842626
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Sep 26 11:00:51 2018 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Sep 26 11:00:51 2018 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../application/ApplicationDescriptorImpl.java | 106 +++-
.../StreamApplicationDescriptorImpl.java | 54 +-
.../TaskApplicationDescriptorImpl.java | 14 +
.../samza/execution/ExecutionPlanner.java | 288 ++---------
.../execution/IntermediateStreamManager.java | 297 +++++++++++
.../org/apache/samza/execution/JobGraph.java | 92 ++--
.../samza/execution/JobGraphJsonGenerator.java | 110 ++--
.../org/apache/samza/execution/JobNode.java | 365 +++----------
.../JobNodeConfigurationGenerator.java | 361 +++++++++++++
.../org/apache/samza/execution/JobPlanner.java | 67 +--
.../apache/samza/execution/LocalJobPlanner.java | 21 +-
.../execution/OperatorSpecGraphAnalyzer.java | 12 +-
.../samza/execution/RemoteJobPlanner.java | 20 +-
.../samza/operators/BaseTableDescriptor.java | 9 +
.../samza/operators/OperatorSpecGraph.java | 7 -
.../samza/runtime/LocalContainerRunner.java | 2 +-
.../samza/table/TableConfigGenerator.java | 48 --
.../samza/zk/ZkJobCoordinatorFactory.java | 5 +-
.../org/apache/samza/config/JobConfig.scala | 4 +-
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../MetricsSnapshotReporterFactory.scala | 1 -
.../samza/util/CoordinatorStreamUtil.scala | 2 +-
.../TestStreamApplicationDescriptorImpl.java | 3 +-
.../TestTaskApplicationDescriptorImpl.java | 8 +-
.../execution/ExecutionPlannerTestBase.java | 157 ++++++
.../samza/execution/TestExecutionPlanner.java | 181 ++++++-
.../TestIntermediateStreamManager.java | 68 +++
.../apache/samza/execution/TestJobGraph.java | 34 +-
.../execution/TestJobGraphJsonGenerator.java | 147 +++++-
.../org/apache/samza/execution/TestJobNode.java | 228 ---------
.../TestJobNodeConfigurationGenerator.java | 509 +++++++++++++++++++
.../samza/execution/TestRemoteJobPlanner.java | 2 +-
.../samza/operators/TestOperatorSpecGraph.java | 1 -
.../operators/spec/OperatorSpecTestUtils.java | 1 -
.../runtime/TestLocalApplicationRunner.java | 9 +-
.../runtime/TestRemoteApplicationRunner.java | 2 +-
.../samza/system/hdfs/HdfsSystemFactory.scala | 2 +-
.../kafka/KafkaCheckpointManagerFactory.scala | 2 +-
.../samza/config/KafkaConsumerConfig.java | 13 +-
.../scala/org/apache/samza/util/KafkaUtil.scala | 2 +-
.../kv/BaseLocalStoreBackedTableProvider.java | 17 +-
.../apache/samza/test/framework/TestRunner.java | 58 +--
.../system/InMemorySystemDescriptor.java | 10 +-
.../table/TestTableDescriptorsProvider.java | 6 -
.../samza/validation/YarnJobValidationTool.java | 2 +-
.../org/apache/samza/job/yarn/YarnJob.scala | 4 +-
47 files changed, 2162 insertions(+), 1192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 48a28f2..a45a875 100644
--- a/build.gradle
+++ b/build.gradle
@@ -194,6 +194,7 @@ project(":samza-core_$scalaVersion") {
testCompile "org.powermock:powermock-core:$powerMockVersion"
testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+ testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
}
checkstyle {
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
index 9679136..b58d5a5 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.application;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@@ -26,13 +27,20 @@ import java.util.Set;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -46,10 +54,15 @@ import org.apache.samza.task.TaskContext;
*/
public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
implements ApplicationDescriptor<S> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
- final Config config;
private final Class<? extends SamzaApplication> appClass;
private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
+ // serdes used by input/output/intermediate streams, keyed by streamId
+ private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
+ // serdes used by tables, keyed by tableId
+ private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
+ final Config config;
// Default to no-op functions in ContextManager
// TODO: this should be replaced by shared context factory defined in SAMZA-1714
@@ -142,6 +155,35 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
}
/**
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ *
+ * @param streamId id of the stream
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ */
+ public KV<Serde, Serde> getStreamSerdes(String streamId) {
+ return streamSerdes.get(streamId);
+ }
+
+ /**
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ *
+ * @param tableId id of the table
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ */
+ public KV<Serde, Serde> getTableSerdes(String tableId) {
+ return tableSerdes.get(tableId);
+ }
+
+ /**
+ * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+ *
+ * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication}
+ */
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.EMPTY_MAP;
+ }
+
+ /**
* Get all the {@link InputDescriptor}s to this application
*
* @return an immutable map of streamId to {@link InputDescriptor}
@@ -176,4 +218,66 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
*/
public abstract Set<SystemDescriptor> getSystemDescriptors();
+ /**
+ * Get all the unique input streamIds in this application
+ *
+ * @return an immutable set of input streamIds
+ */
+ public abstract Set<String> getInputStreamIds();
+
+ /**
+ * Get all the unique output streamIds in this application
+ *
+ * @return an immutable set of output streamIds
+ */
+ public abstract Set<String> getOutputStreamIds();
+
+ KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
+ Serde keySerde, valueSerde;
+
+ KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
+
+ if (serde instanceof KVSerde) {
+ keySerde = ((KVSerde) serde).getKeySerde();
+ valueSerde = ((KVSerde) serde).getValueSerde();
+ } else {
+ keySerde = new NoOpSerde();
+ valueSerde = serde;
+ }
+
+ if (currentSerdePair == null) {
+ if (keySerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+ ". Keys will not be (de)serialized");
+ }
+ if (valueSerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+ ". Values will not be (de)serialized");
+ }
+ streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
+ } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+ throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+ + "different serdes.", streamId));
+ }
+ return streamSerdes.get(streamId);
+ }
+
+ KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
+ Serde keySerde, valueSerde;
+ keySerde = kvSerde.getKeySerde();
+ valueSerde = kvSerde.getValueSerde();
+
+ if (!tableSerdes.containsKey(tableId)) {
+ tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
+ return tableSerdes.get(tableId);
+ }
+
+ KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
+ if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+ throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
+ + "different serdes.", tableId));
+ }
+ return streamSerdes.get(tableId);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
index d50b0d0..5129913 100644
--- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
@@ -51,7 +51,6 @@ import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
@@ -78,7 +77,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
// We use a LHM for deterministic order in initializing and closing operators.
private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
- private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
+ private final Map<String, TableImpl> tables = new LinkedHashMap<>();
private final Set<String> operatorIds = new HashSet<>();
private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
@@ -125,7 +124,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
"getInputStream must not be called multiple times with the same streamId: " + streamId);
Serde serde = inputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
if (outputStreams.containsKey(streamId)) {
OutputStreamImpl outputStream = outputStreams.get(streamId);
Serde keySerde = outputStream.getKeySerde();
@@ -156,7 +155,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
"getOutputStream must not be called multiple times with the same streamId: " + streamId);
Serde serde = outputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
if (inputOperators.containsKey(streamId)) {
InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
Serde keySerde = inputOperatorSpec.getKeySerde();
@@ -186,13 +185,15 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
- TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
- if (tables.containsKey(tableSpec)) {
+ BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+ TableSpec tableSpec = baseTableDescriptor.getTableSpec();
+ if (tables.containsKey(tableSpec.getId())) {
throw new IllegalStateException(
String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
}
- tables.put(tableSpec, new TableImpl(tableSpec));
- return tables.get(tableSpec);
+ tables.put(tableSpec.getId(), new TableImpl(tableSpec));
+ getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
+ return tables.get(tableSpec.getId());
}
/**
@@ -247,6 +248,16 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
}
+ @Override
+ public Set<String> getInputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
+ }
+
+ @Override
+ public Set<String> getOutputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
+ }
+
/**
* Get the default {@link SystemDescriptor} in this application
*
@@ -306,7 +317,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
return Collections.unmodifiableMap(outputStreams);
}
- public Map<TableSpec, TableImpl> getTables() {
+ public Map<String, TableImpl> getTables() {
return Collections.unmodifiableMap(tables);
}
@@ -342,7 +353,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
} else {
isKeyed = serde instanceof KVSerde;
- kvSerdes = getKVSerdes(streamId, serde);
+ kvSerdes = getOrCreateStreamSerdes(streamId, serde);
}
InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
@@ -356,29 +367,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
}
- private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
- Serde keySerde, valueSerde;
-
- if (serde instanceof KVSerde) {
- keySerde = ((KVSerde) serde).getKeySerde();
- valueSerde = ((KVSerde) serde).getValueSerde();
- } else {
- keySerde = new NoOpSerde();
- valueSerde = serde;
- }
-
- if (keySerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
- ". Keys will not be (de)serialized");
- }
- if (valueSerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
- ". Values will not be (de)serialized");
- }
-
- return KV.of(keySerde, valueSerde);
- }
-
// check uniqueness of the {@code systemDescriptor} and add if it is unique
private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
index 3597d7c..d140a90 100644
--- a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
@@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
@@ -65,6 +66,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
// TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
addSystemDescriptor(inputDescriptor.getSystemDescriptor());
}
@@ -73,6 +75,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
public void addOutputStream(OutputDescriptor outputDescriptor) {
Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
addSystemDescriptor(outputDescriptor.getSystemDescriptor());
}
@@ -81,6 +84,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
public void addTable(TableDescriptor tableDescriptor) {
Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+ getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
}
@@ -111,6 +115,16 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
}
+ @Override
+ public Set<String> getInputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
+ }
+
+ @Override
+ public Set<String> getOutputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+ }
+
/**
* Get the user-defined {@link TaskFactory}
* @return the {@link TaskFactory} object
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 46aef8d..eea6387 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -22,72 +22,57 @@ package org.apache.samza.execution;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.TableSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.samza.execution.ExecutionPlanner.StreamEdgeSet.StreamEdgeSetCategory;
import static org.apache.samza.util.StreamUtil.*;
/**
- * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and
+ * The ExecutionPlanner creates the physical execution graph for the {@link ApplicationDescriptorImpl}, and
* the intermediate topics needed for the execution.
*/
// TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811)
public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
- /* package private */ static final int MAX_INFERRED_PARTITIONS = 256;
-
private final Config config;
- private final StreamConfig streamConfig;
private final StreamManager streamManager;
public ExecutionPlanner(Config config, StreamManager streamManager) {
this.config = config;
this.streamManager = streamManager;
- this.streamConfig = new StreamConfig(config);
}
- public ExecutionPlan plan(OperatorSpecGraph opSpecGraph) {
+ public ExecutionPlan plan(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
validateConfig();
- // Create physical job graph based on stream graph
- JobGraph jobGraph = createJobGraph(opSpecGraph);
-
- // Fetch the external streams partition info
- fetchInputAndOutputStreamPartitions(jobGraph);
+ // create physical job graph based on stream graph
+ JobGraph jobGraph = createJobGraph(config, appDesc);
- // Verify agreement in partition count between all joined input/intermediate streams
- validateJoinInputStreamPartitions(jobGraph);
+ // fetch the external streams partition info
+ setInputAndOutputStreamPartitionCount(jobGraph, streamManager);
- if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
- // Set partition count of intermediate streams not participating in joins
- setIntermediateStreamPartitions(jobGraph);
-
- // Validate partition counts were assigned for all intermediate streams
- validateIntermediateStreamPartitions(jobGraph);
- }
+ // figure out the partitions for internal streams
+ new IntermediateStreamManager(config, appDesc).calculatePartitions(jobGraph);
return jobGraph;
}
@@ -103,21 +88,23 @@ public class ExecutionPlanner {
}
/**
- * Creates the physical graph from {@link OperatorSpecGraph}
+ * Create the physical graph from {@link ApplicationDescriptorImpl}
*/
- /* package private */ JobGraph createJobGraph(OperatorSpecGraph opSpecGraph) {
- JobGraph jobGraph = new JobGraph(config, opSpecGraph);
-
+ /* package private */
+ JobGraph createJobGraph(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ JobGraph jobGraph = new JobGraph(config, appDesc);
+ StreamConfig streamConfig = new StreamConfig(config);
// Source streams contain both input and intermediate streams.
- Set<StreamSpec> sourceStreams = getStreamSpecs(opSpecGraph.getInputOperators().keySet(), streamConfig);
+ Set<StreamSpec> sourceStreams = getStreamSpecs(appDesc.getInputStreamIds(), streamConfig);
// Sink streams contain both output and intermediate streams.
- Set<StreamSpec> sinkStreams = getStreamSpecs(opSpecGraph.getOutputStreams().keySet(), streamConfig);
+ Set<StreamSpec> sinkStreams = getStreamSpecs(appDesc.getOutputStreamIds(), streamConfig);
Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams);
Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
- Set<TableSpec> tables = opSpecGraph.getTables().keySet();
+ Set<TableSpec> tables = appDesc.getTableDescriptors().stream()
+ .map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet());
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
@@ -136,15 +123,20 @@ public class ExecutionPlanner {
// Add tables
tables.forEach(spec -> jobGraph.addTable(spec, node));
- jobGraph.validate();
+ if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+ // skip the validation when input streamIds are empty. This is only possible for LegacyTaskApplication
+ jobGraph.validate();
+ }
return jobGraph;
}
/**
- * Fetches the partitions of input/output streams and update the corresponding StreamEdges.
+ * Fetch the partitions of source/sink streams and update the StreamEdges.
+ * @param jobGraph {@link JobGraph}
+ * @param streamManager the {@link StreamManager} to interface with the streams.
*/
- /* package private */ void fetchInputAndOutputStreamPartitions(JobGraph jobGraph) {
+ /* package private */ static void setInputAndOutputStreamPartitionCount(JobGraph jobGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
existingStreams.addAll(jobGraph.getInputStreams());
existingStreams.addAll(jobGraph.getOutputStreams());
@@ -182,224 +174,4 @@ public class ExecutionPlanner {
}
}
- /**
- * Validates agreement in partition count between input/intermediate streams participating in join operations.
- */
- private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
- // Group input operator specs (input/intermediate streams) by the joins they participate in.
- Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
- OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getSpecGraph());
-
- // Convert every group of input operator specs into a group of corresponding stream edges.
- List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
- for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
- Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
- StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
- streamEdgeSets.add(streamEdgeSet);
- }
-
- /*
- * Sort the stream edge groups by their category so they appear in this order:
- * 1. groups composed exclusively of stream edges with set partition counts
- * 2. groups composed of a mix of stream edges with set/unset partition counts
- * 3. groups composed exclusively of stream edges with unset partition counts
- *
- * This guarantees that we process the most constrained stream edge groups first,
- * which is crucial for intermediate stream edges that are members of multiple
- * stream edge groups. For instance, if we have the following groups of stream
- * edges (partition counts in parentheses, question marks for intermediate streams):
- *
- * a. e1 (16), e2 (16)
- * b. e2 (16), e3 (?)
- * c. e3 (?), e4 (?)
- *
- * processing them in the above order (most constrained first) is guaranteed to
- * yield correct assignment of partition counts of e3 and e4 in a single scan.
- */
- Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder()));
-
- // Verify agreement between joined input/intermediate streams.
- // This may involve setting partition counts of intermediate stream edges.
- streamEdgeSets.forEach(ExecutionPlanner::validateAndAssignStreamEdgeSetPartitions);
- }
-
- /**
- * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s
- * correspond to the provided {@code inputOpSpecs}.
- */
- private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs,
- JobGraph jobGraph) {
-
- int countStreamEdgeWithSetPartitions = 0;
- Set<StreamEdge> streamEdges = new HashSet<>();
-
- for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
- StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), streamConfig));
- if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
- ++countStreamEdgeWithSetPartitions;
- }
- streamEdges.add(streamEdge);
- }
-
- // Determine category of stream group based on stream partition counts.
- StreamEdgeSetCategory category;
- if (countStreamEdgeWithSetPartitions == 0) {
- category = StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
- } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
- category = StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
- } else {
- category = StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
- }
-
- return new StreamEdgeSet(setId, streamEdges, category);
- }
-
- /**
- * Sets partition count of intermediate streams which have not been assigned partition counts.
- */
- private void setIntermediateStreamPartitions(JobGraph jobGraph) {
- final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
- int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
- if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
- // use the following simple algo to figure out the partitions
- // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
- // partition will be further bounded by MAX_INFERRED_PARTITIONS.
- // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
- int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
- int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
- partitions = Math.max(maxInPartitions, maxOutPartitions);
-
- if (partitions > MAX_INFERRED_PARTITIONS) {
- partitions = MAX_INFERRED_PARTITIONS;
- log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
- partitions, MAX_INFERRED_PARTITIONS));
- }
- } else {
- // Reject any zero or other negative values explicitly specified in config.
- if (partitions <= 0) {
- throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
- defaultPartitionsConfigProperty));
- }
-
- log.info("Using partition count value {} specified for config property {}", partitions,
- defaultPartitionsConfigProperty);
- }
-
- for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
- if (edge.getPartitionCount() <= 0) {
- log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
- edge.setPartitionCount(partitions);
- }
- }
- }
-
- /**
- * Ensures all intermediate streams have been assigned partition counts.
- */
- private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
- for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
- if (edge.getPartitionCount() <= 0) {
- throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
- }
- }
- }
-
- /**
- * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count.
- * This may include setting partition counts of intermediate streams in this set that do not
- * have their partition counts set.
- */
- private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) {
- Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
- StreamEdge firstStreamEdgeWithSetPartitions =
- streamEdges.stream()
- .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
- .findFirst()
- .orElse(null);
-
- // This group consists exclusively of intermediate streams with unknown partition counts.
- // We cannot do any validation/computation of partition counts of such streams right here,
- // but they are tackled later in the ExecutionPlanner.
- if (firstStreamEdgeWithSetPartitions == null) {
- return;
- }
-
- // Make sure all other stream edges in this group have the same partition count.
- int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
- for (StreamEdge streamEdge : streamEdges) {
- int streamPartitions = streamEdge.getPartitionCount();
- if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
- streamEdge.setPartitionCount(partitions);
- log.info("Inferred the partition count {} for the join operator {} from {}."
- , new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()});
- } else if (streamPartitions != partitions) {
- throw new SamzaException(String.format(
- "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
- streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions));
- }
- }
- }
-
- /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
- return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
- }
-
- /**
- * Represents a set of {@link StreamEdge}s.
- */
- /* package private */ static class StreamEdgeSet {
-
- /**
- * Indicates whether all stream edges in this group have their partition counts assigned.
- */
- public enum StreamEdgeSetCategory {
- /**
- * All stream edges in this group have their partition counts assigned.
- */
- ALL_PARTITION_COUNT_SET(0),
-
- /**
- * Only some stream edges in this group have their partition counts assigned.
- */
- SOME_PARTITION_COUNT_SET(1),
-
- /**
- * No stream edge in this group is assigned a partition count.
- */
- NO_PARTITION_COUNT_SET(2);
-
-
- private final int sortOrder;
-
- StreamEdgeSetCategory(int sortOrder) {
- this.sortOrder = sortOrder;
- }
-
- public int getSortOrder() {
- return sortOrder;
- }
- }
-
- private final String setId;
- private final Set<StreamEdge> streamEdges;
- private final StreamEdgeSetCategory category;
-
- public StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) {
- this.setId = setId;
- this.streamEdges = streamEdges;
- this.category = category;
- }
-
- public Set<StreamEdge> getStreamEdges() {
- return streamEdges;
- }
-
- public String getSetId() {
- return setId;
- }
-
- public StreamEdgeSetCategory getCategory() {
- return category;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
new file mode 100644
index 0000000..66cbe6a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link IntermediateStreamManager} calculates intermediate stream partitions based on the high-level application graph.
+ */
+class IntermediateStreamManager {
+
+ private static final Logger log = LoggerFactory.getLogger(IntermediateStreamManager.class);
+
+ private final Config config;
+ private final Map<String, InputOperatorSpec> inputOperators;
+
+ @VisibleForTesting
+ static final int MAX_INFERRED_PARTITIONS = 256;
+
+ IntermediateStreamManager(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ this.config = config;
+ this.inputOperators = appDesc.getInputOperators();
+ }
+
+ /**
+ * Figure out the number of partitions of all streams
+ */
+ /* package private */ void calculatePartitions(JobGraph jobGraph) {
+
+ // Verify agreement in partition count between all joined input/intermediate streams
+ validateJoinInputStreamPartitions(jobGraph);
+
+ if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
+ // Set partition count of intermediate streams not participating in joins
+ setIntermediateStreamPartitions(jobGraph);
+
+ // Validate partition counts were assigned for all intermediate streams
+ validateIntermediateStreamPartitions(jobGraph);
+ }
+ }
+
+ /**
+ * Validates agreement in partition count between input/intermediate streams participating in join operations.
+ */
+ private void validateJoinInputStreamPartitions(JobGraph jobGraph) {
+ // Group input operator specs (input/intermediate streams) by the joins they participate in.
+ Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs =
+ OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(inputOperators.values());
+
+ // Convert every group of input operator specs into a group of corresponding stream edges.
+ List<StreamEdgeSet> streamEdgeSets = new ArrayList<>();
+ for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) {
+ Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec);
+ StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph);
+ streamEdgeSets.add(streamEdgeSet);
+ }
+
+ /*
+ * Sort the stream edge groups by their category so they appear in this order:
+ * 1. groups composed exclusively of stream edges with set partition counts
+ * 2. groups composed of a mix of stream edges with set/unset partition counts
+ * 3. groups composed exclusively of stream edges with unset partition counts
+ *
+ * This guarantees that we process the most constrained stream edge groups first,
+ * which is crucial for intermediate stream edges that are members of multiple
+ * stream edge groups. For instance, if we have the following groups of stream
+ * edges (partition counts in parentheses, question marks for intermediate streams):
+ *
+ * a. e1 (16), e2 (16)
+ * b. e2 (16), e3 (?)
+ * c. e3 (?), e4 (?)
+ *
+ * processing them in the above order (most constrained first) is guaranteed to
+ * yield correct assignment of partition counts of e3 and e4 in a single scan.
+ */
+ Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder()));
+
+ // Verify agreement between joined input/intermediate streams.
+ // This may involve setting partition counts of intermediate stream edges.
+ streamEdgeSets.forEach(IntermediateStreamManager::validateAndAssignStreamEdgeSetPartitions);
+ }
+
+ /**
+ * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s
+ * correspond to the provided {@code inputOpSpecs}.
+ */
+ private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs,
+ JobGraph jobGraph) {
+
+ int countStreamEdgeWithSetPartitions = 0;
+ Set<StreamEdge> streamEdges = new HashSet<>();
+
+ for (InputOperatorSpec inputOpSpec : inputOpSpecs) {
+ StreamEdge streamEdge = jobGraph.getStreamEdge(inputOpSpec.getStreamId());
+ if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) {
+ ++countStreamEdgeWithSetPartitions;
+ }
+ streamEdges.add(streamEdge);
+ }
+
+ // Determine category of stream group based on stream partition counts.
+ StreamEdgeSet.StreamEdgeSetCategory category;
+ if (countStreamEdgeWithSetPartitions == 0) {
+ category = StreamEdgeSet.StreamEdgeSetCategory.NO_PARTITION_COUNT_SET;
+ } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) {
+ category = StreamEdgeSet.StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET;
+ } else {
+ category = StreamEdgeSet.StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET;
+ }
+
+ return new StreamEdgeSet(setId, streamEdges, category);
+ }
+
+ /**
+ * Sets partition count of intermediate streams which have not been assigned partition counts.
+ */
+ private void setIntermediateStreamPartitions(JobGraph jobGraph) {
+ final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
+ int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
+ if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
+ // use the following simple algo to figure out the partitions
+ // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+ // partition will be further bounded by MAX_INFERRED_PARTITIONS.
+ // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
+ int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
+ int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
+ partitions = Math.max(maxInPartitions, maxOutPartitions);
+
+ if (partitions > MAX_INFERRED_PARTITIONS) {
+ partitions = MAX_INFERRED_PARTITIONS;
+ log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
+ partitions, MAX_INFERRED_PARTITIONS));
+ }
+ } else {
+ // Reject any zero or other negative values explicitly specified in config.
+ if (partitions <= 0) {
+ throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
+ defaultPartitionsConfigProperty));
+ }
+
+ log.info("Using partition count value {} specified for config property {}", partitions,
+ defaultPartitionsConfigProperty);
+ }
+
+ for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
+ if (edge.getPartitionCount() <= 0) {
+ log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
+ edge.setPartitionCount(partitions);
+ }
+ }
+ }
+
+ /**
+ * Ensures all intermediate streams have been assigned partition counts.
+ */
+ private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
+ for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
+ if (edge.getPartitionCount() <= 0) {
+ throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
+ }
+ }
+ }
+
+ /**
+ * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count.
+ * This may include setting partition counts of intermediate streams in this set that do not
+ * have their partition counts set.
+ */
+ private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) {
+ Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges();
+ StreamEdge firstStreamEdgeWithSetPartitions =
+ streamEdges.stream()
+ .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN)
+ .findFirst()
+ .orElse(null);
+
+ // This group consists exclusively of intermediate streams with unknown partition counts.
+ // We cannot do any validation/computation of partition counts of such streams right here,
+ // but they are tackled later in the ExecutionPlanner.
+ if (firstStreamEdgeWithSetPartitions == null) {
+ return;
+ }
+
+ // Make sure all other stream edges in this group have the same partition count.
+ int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount();
+ for (StreamEdge streamEdge : streamEdges) {
+ int streamPartitions = streamEdge.getPartitionCount();
+ if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) {
+ streamEdge.setPartitionCount(partitions);
+ log.info("Inferred the partition count {} for the join operator {} from {}.",
+ new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()});
+ } else if (streamPartitions != partitions) {
+ throw new SamzaException(String.format(
+ "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
+ streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions));
+ }
+ }
+ }
+
+ /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
+ return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
+ }
+
+ /**
+ * Represents a set of {@link StreamEdge}s.
+ */
+ /* package private */ static class StreamEdgeSet {
+
+ /**
+ * Indicates whether all stream edges in this group have their partition counts assigned.
+ */
+ public enum StreamEdgeSetCategory {
+ /**
+ * All stream edges in this group have their partition counts assigned.
+ */
+ ALL_PARTITION_COUNT_SET(0),
+
+ /**
+ * Only some stream edges in this group have their partition counts assigned.
+ */
+ SOME_PARTITION_COUNT_SET(1),
+
+ /**
+ * No stream edge in this group is assigned a partition count.
+ */
+ NO_PARTITION_COUNT_SET(2);
+
+
+ private final int sortOrder;
+
+ StreamEdgeSetCategory(int sortOrder) {
+ this.sortOrder = sortOrder;
+ }
+
+ public int getSortOrder() {
+ return sortOrder;
+ }
+ }
+
+ private final String setId;
+ private final Set<StreamEdge> streamEdges;
+ private final StreamEdgeSetCategory category;
+
+ StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) {
+ this.setId = setId;
+ this.streamEdges = streamEdges;
+ this.category = category;
+ }
+
+ Set<StreamEdge> getStreamEdges() {
+ return streamEdges;
+ }
+
+ String getSetId() {
+ return setId;
+ }
+
+ StreamEdgeSetCategory getCategory() {
+ return category;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 5b19095..d975188 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -31,10 +31,11 @@ import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.TableSpec;
import org.slf4j.Logger;
@@ -59,16 +60,21 @@ import org.slf4j.LoggerFactory;
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
private final Set<TableSpec> tables = new HashSet<>();
private final Config config;
- private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
- private final OperatorSpecGraph specGraph;
+ private final JobGraphJsonGenerator jsonGenerator;
+ private final JobNodeConfigurationGenerator configGenerator;
+ private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
/**
* The JobGraph is only constructed by the {@link ExecutionPlanner}.
- * @param config Config
+ *
+ * @param config configuration for the application
+ * @param appDesc {@link ApplicationDescriptorImpl} describing the application
*/
- JobGraph(Config config, OperatorSpecGraph specGraph) {
+ JobGraph(Config config, ApplicationDescriptorImpl appDesc) {
this.config = config;
- this.specGraph = specGraph;
+ this.appDesc = appDesc;
+ this.jsonGenerator = new JobGraphJsonGenerator();
+ this.configGenerator = new JobNodeConfigurationGenerator();
}
@Override
@@ -91,11 +97,6 @@ import org.slf4j.LoggerFactory;
.collect(Collectors.toList());
}
- void addTable(TableSpec tableSpec, JobNode node) {
- tables.add(tableSpec);
- node.addTable(tableSpec);
- }
-
@Override
public String getPlanAsJson() throws Exception {
return jsonGenerator.toJson(this);
@@ -105,14 +106,11 @@ import org.slf4j.LoggerFactory;
* Returns the config for this application
* @return {@link ApplicationConfig}
*/
+ @Override
public ApplicationConfig getApplicationConfig() {
return new ApplicationConfig(config);
}
- public OperatorSpecGraph getSpecGraph() {
- return specGraph;
- }
-
/**
* Add a source stream to a {@link JobNode}
* @param streamSpec input stream
@@ -152,20 +150,20 @@ import org.slf4j.LoggerFactory;
intermediateStreams.add(edge);
}
+ void addTable(TableSpec tableSpec, JobNode node) {
+ tables.add(tableSpec);
+ node.addTable(tableSpec);
+ }
+
/**
* Get the {@link JobNode}. Create one if it does not exist.
* @param jobName name of the job
* @param jobId id of the job
- * @return
+ * @return {@link JobNode} created with {@code jobName} and {@code jobId}
*/
JobNode getOrCreateJobNode(String jobName, String jobId) {
- String nodeId = JobNode.createId(jobName, jobId);
- JobNode node = nodes.get(nodeId);
- if (node == null) {
- node = new JobNode(jobName, jobId, specGraph, config);
- nodes.put(nodeId, node);
- }
- return node;
+ String nodeId = JobNode.createJobNameAndId(jobName, jobId);
+ return nodes.computeIfAbsent(nodeId, k -> new JobNode(jobName, jobId, config, appDesc, configGenerator));
}
/**
@@ -178,20 +176,13 @@ import org.slf4j.LoggerFactory;
}
/**
- * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
- * @param streamSpec spec of the StreamEdge
- * @param isIntermediate boolean flag indicating whether it's an intermediate stream
+ * Get the {@link StreamEdge} for {@code streamId}.
+ *
+ * @param streamId the streamId for the {@link StreamEdge}
* @return stream edge
*/
- StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
- String streamId = streamSpec.getId();
- StreamEdge edge = edges.get(streamId);
- if (edge == null) {
- boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId);
- edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
- edges.put(streamId, edge);
- }
- return edge;
+ StreamEdge getStreamEdge(String streamId) {
+ return edges.get(streamId);
}
/**
@@ -248,6 +239,23 @@ import org.slf4j.LoggerFactory;
}
/**
+ * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
+ * @param streamSpec spec of the StreamEdge
+ * @param isIntermediate boolean flag indicating whether it's an intermediate stream
+ * @return stream edge
+ */
+ private StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
+ String streamId = streamSpec.getId();
+ StreamEdge edge = edges.get(streamId);
+ if (edge == null) {
+ boolean isBroadcast = appDesc.getBroadcastStreams().contains(streamId);
+ edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
+ edges.put(streamId, edge);
+ }
+ return edge;
+ }
+
+ /**
* Validate the input streams should have indegree being 0 and outdegree greater than 0
*/
private void validateInputStreams() {
@@ -305,7 +313,7 @@ import org.slf4j.LoggerFactory;
Set<JobNode> unreachable = new HashSet<>(nodes.values());
unreachable.removeAll(reachable);
throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
- String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList()))));
+ String.join(", ", unreachable.stream().map(JobNode::getJobNameAndId).collect(Collectors.toList()))));
}
}
@@ -325,7 +333,7 @@ import org.slf4j.LoggerFactory;
while (!queue.isEmpty()) {
JobNode node = queue.poll();
- node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+ node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
if (!visited.contains(target)) {
visited.add(target);
queue.offer(target);
@@ -351,9 +359,9 @@ import org.slf4j.LoggerFactory;
Map<String, Long> indegree = new HashMap<>();
Set<JobNode> visited = new HashSet<>();
pnodes.forEach(node -> {
- String nid = node.getId();
+ String nid = node.getJobNameAndId();
//only count the degrees of intermediate streams
- long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count();
+ long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count();
indegree.put(nid, degree);
if (degree == 0L) {
@@ -378,8 +386,8 @@ import org.slf4j.LoggerFactory;
while (!q.isEmpty()) {
JobNode node = q.poll();
sortedNodes.add(node);
- node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
- String nid = n.getId();
+ node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+ String nid = n.getJobNameAndId();
Long degree = indegree.get(nid) - 1;
indegree.put(nid, degree);
if (degree == 0L && !visited.contains(n)) {
@@ -400,7 +408,7 @@ import org.slf4j.LoggerFactory;
long min = Long.MAX_VALUE;
JobNode minNode = null;
for (JobNode node : reachable) {
- Long degree = indegree.get(node.getId());
+ Long degree = indegree.get(node.getJobNameAndId());
if (degree < min) {
min = degree;
minNode = node;
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 91453d2..18705e4 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -32,7 +32,6 @@ import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -140,7 +139,7 @@ import org.codehaus.jackson.map.ObjectMapper;
jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
jobGraphJson.jobs = jobGraph.getJobNodes().stream()
- .map(jobNode -> buildJobNodeJson(jobNode))
+ .map(this::buildJobNodeJson)
.collect(Collectors.toList());
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -149,54 +148,12 @@ import org.codehaus.jackson.map.ObjectMapper;
return new String(out.toByteArray());
}
- /**
- * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job
- * @param jobNode job node in the {@link JobGraph}
- * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
- */
- private JobNodeJson buildJobNodeJson(JobNode jobNode) {
- JobNodeJson job = new JobNodeJson();
- job.jobName = jobNode.getJobName();
- job.jobId = jobNode.getJobId();
- job.operatorGraph = buildOperatorGraphJson(jobNode);
- return job;
- }
-
- /**
- * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
- * @param jobNode job node in the {@link JobGraph}
- * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
- */
- private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
- OperatorGraphJson opGraph = new OperatorGraphJson();
- opGraph.inputStreams = new ArrayList<>();
- jobNode.getSpecGraph().getInputOperators().forEach((streamId, operatorSpec) -> {
- StreamJson inputJson = new StreamJson();
- opGraph.inputStreams.add(inputJson);
- inputJson.streamId = streamId;
- inputJson.nextOperatorIds = operatorSpec.getRegisteredOperatorSpecs().stream()
- .map(OperatorSpec::getOpId).collect(Collectors.toSet());
-
- updateOperatorGraphJson(operatorSpec, opGraph);
- });
-
- opGraph.outputStreams = new ArrayList<>();
- jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> {
- StreamJson outputJson = new StreamJson();
- outputJson.streamId = streamId;
- opGraph.outputStreams.add(outputJson);
- });
- return opGraph;
- }
-
- /**
- * Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO.
- * @param operatorSpec input
- * @param opGraph operator graph to build
- */
private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) {
- // TODO xiliu: render input operators instead of input streams
- if (operatorSpec.getOpCode() != OpCode.INPUT) {
+ if (operatorSpec == null) {
+ // task application may not have any defined OperatorSpec
+ return;
+ }
+ if (operatorSpec.getOpCode() != OperatorSpec.OpCode.INPUT) {
opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec));
}
Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
@@ -243,6 +200,46 @@ import org.codehaus.jackson.map.ObjectMapper;
}
/**
+ * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.application.ApplicationDescriptorImpl}
+ * for this job
+ *
+ * @param jobNode job node in the {@link JobGraph}
+ * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
+ */
+ private JobNodeJson buildJobNodeJson(JobNode jobNode) {
+ JobNodeJson job = new JobNodeJson();
+ job.jobName = jobNode.getJobName();
+ job.jobId = jobNode.getJobId();
+ job.operatorGraph = buildOperatorGraphJson(jobNode);
+ return job;
+ }
+
+ /**
+ * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
+ * @param jobNode job node in the {@link JobGraph}
+ * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
+ */
+ private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
+ OperatorGraphJson opGraph = new OperatorGraphJson();
+ opGraph.inputStreams = new ArrayList<>();
+ jobNode.getInEdges().values().forEach(inStream -> {
+ StreamJson inputJson = new StreamJson();
+ opGraph.inputStreams.add(inputJson);
+ inputJson.streamId = inStream.getStreamSpec().getId();
+ inputJson.nextOperatorIds = jobNode.getNextOperatorIds(inputJson.streamId);
+ updateOperatorGraphJson(jobNode.getInputOperator(inputJson.streamId), opGraph);
+ });
+
+ opGraph.outputStreams = new ArrayList<>();
+ jobNode.getOutEdges().values().forEach(outStream -> {
+ StreamJson outputJson = new StreamJson();
+ outputJson.streamId = outStream.getStreamSpec().getId();
+ opGraph.outputStreams.add(outputJson);
+ });
+ return opGraph;
+ }
+
+ /**
* Get or create the JSON POJO for a {@link StreamEdge}
* @param edge {@link StreamEdge}
* @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
@@ -261,15 +258,11 @@ import org.codehaus.jackson.map.ObjectMapper;
edgeJson.streamSpec = streamSpecJson;
List<String> sourceJobs = new ArrayList<>();
- edge.getSourceNodes().forEach(jobNode -> {
- sourceJobs.add(jobNode.getJobName());
- });
+ edge.getSourceNodes().forEach(jobNode -> sourceJobs.add(jobNode.getJobName()));
edgeJson.sourceJobs = sourceJobs;
List<String> targetJobs = new ArrayList<>();
- edge.getTargetNodes().forEach(jobNode -> {
- targetJobs.add(jobNode.getJobName());
- });
+ edge.getTargetNodes().forEach(jobNode -> targetJobs.add(jobNode.getJobName()));
edgeJson.targetJobs = targetJobs;
streamEdges.put(streamId, edgeJson);
@@ -285,12 +278,7 @@ import org.codehaus.jackson.map.ObjectMapper;
*/
private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) {
String tableId = tableSpec.getId();
- TableSpecJson tableSpecJson = tableSpecs.get(tableId);
- if (tableSpecJson == null) {
- tableSpecJson = buildTableJson(tableSpec);
- tableSpecs.put(tableId, tableSpecJson);
- }
- return tableSpecJson;
+ return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableSpec));
}
/**