You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/02 17:09:36 UTC
[4/4] samza git commit: SAMZA-1204: Visualize StreamGraph and
ExecutionPlan
SAMZA-1204: Visualize StreamGraph and ExecutionPlan
Once a Samza application (using fluent API) is deployed, an execution plan will be generated by the ExecutionPlanner. The plan JSON will be written to a file (plan.json) under the ./plan directory, which also contains the plan.html and javscripts (js folder).
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: xinyuiscool <xi...@gmail.com>
Reviewers: Jake Maes <jm...@apache.org>
Closes #127 from xinyuiscool/SAMZA-1204
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b71b253d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b71b253d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b71b253d
Branch: refs/heads/master
Commit: b71b253d2ad6bf1dc68c7bc6bb2e30782d86c7ff
Parents: ad1f161
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue May 2 10:09:22 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue May 2 10:09:22 2017 -0700
----------------------------------------------------------------------
build.gradle | 3 +
.../apache/samza/execution/ExecutionPlan.java | 11 +-
.../samza/execution/ExecutionPlanner.java | 10 +-
.../org/apache/samza/execution/JobGraph.java | 45 +++--
.../samza/execution/JobGraphJsonGenerator.java | 199 ++++++++++---------
.../org/apache/samza/execution/JobNode.java | 10 +-
.../samza/operators/MessageStreamImpl.java | 30 +--
.../samza/operators/impl/RootOperatorImpl.java | 5 +
.../samza/operators/spec/OperatorSpec.java | 6 +
.../operators/spec/PartialJoinOperatorSpec.java | 8 +
.../samza/operators/spec/SinkOperatorSpec.java | 8 +
.../operators/spec/StreamOperatorSpec.java | 8 +
.../operators/spec/WindowOperatorSpec.java | 8 +
.../samza/operators/util/OperatorJsonUtils.java | 89 +++++++++
.../runtime/AbstractApplicationRunner.java | 27 +++
.../samza/runtime/LocalApplicationRunner.java | 1 +
.../samza/runtime/RemoteApplicationRunner.java | 1 +
.../samza/config/ShellCommandConfig.scala | 5 +
.../samza/execution/TestExecutionPlanner.java | 15 +-
.../apache/samza/execution/TestJobGraph.java | 82 ++++----
.../execution/TestJobGraphJsonGenerator.java | 4 +-
.../samza/operators/impl/TestOperatorImpl.java | 5 +
.../samza/operators/spec/TestOperatorSpecs.java | 6 +-
samza-shell/src/main/assembly/src.xml | 8 +
samza-shell/src/main/bash/run-app.sh | 9 +
samza-shell/src/main/visualizer/js/d3.v3.min.js | 5 +
.../src/main/visualizer/js/dagre-d3.min.js | 28 +++
.../src/main/visualizer/js/planToDagre.js | 91 +++++++++
samza-shell/src/main/visualizer/plan.html | 118 +++++++++++
29 files changed, 651 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index dc56077..74e5161 100644
--- a/build.gradle
+++ b/build.gradle
@@ -71,6 +71,8 @@ rat {
'**/non-responsive.less',
'**/ropa-sans.css',
'**/syntax.css',
+ '**/d3.v3.min.js',
+ '**/dagre-d3.min.js',
'.idea/**',
'.reviewboardrc',
'docs/_site/**',
@@ -396,6 +398,7 @@ project(":samza-shell") {
classifier = 'dist'
from 'src/main/bash'
from 'src/main/resources'
+ from 'src/main/visualizer'
}
artifacts {
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
index 6e2b4c6..bde9bfb 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
@@ -20,6 +20,7 @@
package org.apache.samza.execution;
import java.util.List;
+import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.StreamSpec;
@@ -28,10 +29,11 @@ import org.apache.samza.system.StreamSpec;
* This interface represents Samza {@link org.apache.samza.application.StreamApplication}
* plans for physical execution.
*/
+@InterfaceStability.Unstable
public interface ExecutionPlan {
/**
- * Returns the configs for single stage job, in the order of topologically sort.
+ * Returns the configs for single stage job, in topological sort order.
* @return list of job configs
*/
List<JobConfig> getJobConfigs();
@@ -43,9 +45,10 @@ public interface ExecutionPlan {
List<StreamSpec> getIntermediateStreams();
/**
- * Returns the JSON representation of the plan for visualization
- * @return json string
- * @throws Exception exception
+ * Returns the JSON representation of the plan.
+ * @return JSON string
+ * @throws Exception exception during JSON serialization, including {@link java.io.IOException}
+ * and {@link org.codehaus.jackson.JsonGenerationException}
*/
String getPlanAsJson() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index ac39eb8..d763d84 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -61,6 +61,9 @@ public class ExecutionPlanner {
// create physical job graph based on stream graph
JobGraph jobGraph = createJobGraph(streamGraph);
+ // fetch the external streams partition info
+ updateExistingPartitions(jobGraph, streamManager);
+
if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
// figure out the partitions for internal streams
calculatePartitions(streamGraph, jobGraph);
@@ -84,7 +87,7 @@ public class ExecutionPlanner {
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
- JobNode node = jobGraph.getOrCreateNode(jobName, jobId, streamGraph);
+ JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);
// add sources
sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
@@ -104,9 +107,6 @@ public class ExecutionPlanner {
* Figure out the number of partitions of all streams
*/
/* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
- // fetch the external streams partition info
- updateExistingPartitions(jobGraph, streamManager);
-
// calculate the partitions for the input streams of join operators
calculateJoinInputPartitions(streamGraph, jobGraph);
@@ -167,7 +167,7 @@ public class ExecutionPlanner {
Set<OperatorSpec> visited = new HashSet<>();
streamGraph.getInputStreams().entrySet().forEach(entry -> {
- StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
+ StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
outputStreamToJoinSpec, joinQ, visited);
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index ff5fbdf..35f27ab 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.StreamGraphImpl;
@@ -65,40 +66,46 @@ import org.slf4j.LoggerFactory;
this.config = config;
}
- /**
- * Returns the configs for single stage job, in the order of topologically sort.
- * @return list of job configs
- */
+ @Override
public List<JobConfig> getJobConfigs() {
- return getJobNodes().stream().map(JobNode::generateConfig).collect(Collectors.toList());
+ String json = "";
+ try {
+ json = getPlanAsJson();
+ } catch (Exception e) {
+ log.warn("Failed to generate plan JSON", e);
+ }
+
+ final String planJson = json;
+ return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
}
- /**
- * Returns the intermediate streams that need to be created.
- * @return intermediate {@link StreamSpec}s
- */
+ @Override
public List<StreamSpec> getIntermediateStreams() {
return getIntermediateStreamEdges().stream()
.map(streamEdge -> streamEdge.getStreamSpec())
.collect(Collectors.toList());
}
- /**
- * Returns the JSON representation of the plan for visualization
- * @return json string
- * @throws Exception
- */
+ @Override
public String getPlanAsJson() throws Exception {
return jsonGenerator.toJson(this);
}
/**
+ * Returns the config for this application
+ * @return {@link ApplicationConfig}
+ */
+ public ApplicationConfig getApplicationConfig() {
+ return new ApplicationConfig(config);
+ }
+
+ /**
* Add a source stream to a {@link JobNode}
* @param input source stream
* @param node the job node that consumes from the source
*/
void addSource(StreamSpec input, JobNode node) {
- StreamEdge edge = getOrCreateEdge(input);
+ StreamEdge edge = getOrCreateStreamEdge(input);
edge.addTargetNode(node);
node.addInEdge(edge);
sources.add(edge);
@@ -110,7 +117,7 @@ import org.slf4j.LoggerFactory;
* @param node the job node that outputs to the sink
*/
void addSink(StreamSpec output, JobNode node) {
- StreamEdge edge = getOrCreateEdge(output);
+ StreamEdge edge = getOrCreateStreamEdge(output);
edge.addSourceNode(node);
node.addOutEdge(edge);
sinks.add(edge);
@@ -123,7 +130,7 @@ import org.slf4j.LoggerFactory;
* @param to the target node
*/
void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
- StreamEdge edge = getOrCreateEdge(streamSpec);
+ StreamEdge edge = getOrCreateStreamEdge(streamSpec);
edge.addSourceNode(from);
edge.addTargetNode(to);
from.addOutEdge(edge);
@@ -137,7 +144,7 @@ import org.slf4j.LoggerFactory;
* @param jobId id of the job
* @return
*/
- JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
+ JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
String nodeId = JobNode.createId(jobName, jobId);
JobNode node = nodes.get(nodeId);
if (node == null) {
@@ -152,7 +159,7 @@ import org.slf4j.LoggerFactory;
* @param streamSpec spec of the StreamEdge
* @return stream edge
*/
- StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
+ StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
String streamId = streamSpec.getId();
StreamEdge edge = edges.get(streamId);
if (edge == null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 317616c..96c0538 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,95 +19,99 @@
package org.apache.samza.execution;
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.util.OperatorJsonUtils;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
+
/**
* This class generates the JSON representation of the {@link JobGraph}.
*/
-public class JobGraphJsonGenerator {
-
- /**
- * This class provides the necessary connection of operators for traversal.
- */
- static abstract class Traversable {
- @JsonProperty("NextOperatorIds")
- Set<Integer> nextOperatorIds = new HashSet<>();
- }
-
- static final class OperatorJson extends Traversable {
- @JsonProperty("OpCode")
- String opCode;
- @JsonProperty("OpId")
- int opId;
- @JsonProperty("OutputStreamId")
- String outputStreamId;
- @JsonProperty("PairedOpId")
- int pairedOpId = -1; //for join operator, we will have a pair nodes for two partial joins
- }
+/* package private */ class JobGraphJsonGenerator {
static final class StreamSpecJson {
- @JsonProperty("Id")
+ @JsonProperty("id")
String id;
- @JsonProperty("SystemName")
+ @JsonProperty("systemName")
String systemName;
- @JsonProperty("PhysicalName")
+ @JsonProperty("physicalName")
String physicalName;
- @JsonProperty("PartitionCount")
+ @JsonProperty("partitionCount")
int partitionCount;
}
static final class StreamEdgeJson {
- @JsonProperty("StreamSpec")
+ @JsonProperty("streamSpec")
StreamSpecJson streamSpec;
+ @JsonProperty("sourceJobs")
+ List<String> sourceJobs;
+ @JsonProperty("targetJobs")
+ List<String> targetJobs;
}
static final class OperatorGraphJson {
- @JsonProperty("InputStreams")
- List<InputStreamJson> inputStreams;
- @JsonProperty("Operators")
- Map<Integer, OperatorJson> operators = new HashMap<>();
+ @JsonProperty("inputStreams")
+ List<StreamJson> inputStreams;
+ @JsonProperty("outputStreams")
+ List<StreamJson> outputStreams;
+ @JsonProperty("operators")
+ Map<Integer, Map<String, Object>> operators = new HashMap<>();
+ @JsonProperty("canonicalOpIds")
+ Map<Integer, String> canonicalOpIds = new HashMap<>();
}
- static final class InputStreamJson extends Traversable {
- @JsonProperty("StreamId")
+ static final class StreamJson {
+ @JsonProperty("streamId")
String streamId;
+ @JsonProperty("nextOperatorIds")
+ Set<Integer> nextOperatorIds = new HashSet<>();
}
static final class JobNodeJson {
- @JsonProperty("JobName")
+ @JsonProperty("jobName")
String jobName;
- @JsonProperty("JobId")
+ @JsonProperty("jobId")
String jobId;
- @JsonProperty("OperatorGraph")
+ @JsonProperty("operatorGraph")
OperatorGraphJson operatorGraph;
}
static final class JobGraphJson {
- @JsonProperty("Jobs")
+ @JsonProperty("jobs")
List<JobNodeJson> jobs;
- @JsonProperty("Streams")
- Map<String, StreamEdgeJson> streams;
+ @JsonProperty("sourceStreams")
+ Map<String, StreamEdgeJson> sourceStreams;
+ @JsonProperty("sinkStreams")
+ Map<String, StreamEdgeJson> sinkStreams;
+ @JsonProperty("intermediateStreams")
+ Map<String, StreamEdgeJson> intermediateStreams;
+ @JsonProperty("applicationName")
+ String applicationName;
+ @JsonProperty("applicationId")
+ String applicationId;
}
- // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
- // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
- // (who register first in the map wins).
- Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
+ // Mapping from the output stream to the ids.
+ // Logically they belong to the same operator, but in code we generate one operator for each input.
+ // This is to associate the operators that output to the same MessageStream.
+ Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();
/**
* Returns the JSON representation of a {@link JobGraph}
@@ -119,13 +123,18 @@ public class JobGraphJsonGenerator {
JobGraphJson jobGraphJson = new JobGraphJson();
// build StreamEdge JSON
- jobGraphJson.streams = new HashMap<>();
- jobGraph.getSources().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
- jobGraph.getSinks().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
- jobGraph.getIntermediateStreamEdges().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
+ ApplicationConfig appConfig = jobGraph.getApplicationConfig();
+ jobGraphJson.applicationName = appConfig.getAppName();
+ jobGraphJson.applicationId = appConfig.getAppId();
+ jobGraphJson.sourceStreams = new HashMap<>();
+ jobGraphJson.sinkStreams = new HashMap<>();
+ jobGraphJson.intermediateStreams = new HashMap<>();
+ jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
+ jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
+ jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
jobGraphJson.jobs = jobGraph.getJobNodes().stream()
- .map(jobNode -> buildJobNodeJson(jobNode, jobGraphJson.streams))
+ .map(jobNode -> buildJobNodeJson(jobNode))
.collect(Collectors.toList());
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -137,10 +146,9 @@ public class JobGraphJsonGenerator {
/**
* Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job
* @param jobNode job node in the {@link JobGraph}
- * @param streamEdges map of {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
* @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
*/
- private JobNodeJson buildJobNodeJson(JobNode jobNode, Map<String, StreamEdgeJson> streamEdges) {
+ private JobNodeJson buildJobNodeJson(JobNode jobNode) {
JobNodeJson job = new JobNodeJson();
job.jobName = jobNode.getJobName();
job.jobId = jobNode.getJobId();
@@ -157,10 +165,27 @@ public class JobGraphJsonGenerator {
OperatorGraphJson opGraph = new OperatorGraphJson();
opGraph.inputStreams = new ArrayList<>();
jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
- InputStreamJson inputJson = new InputStreamJson();
- inputJson.streamId = streamSpec.getId();
+ StreamJson inputJson = new StreamJson();
opGraph.inputStreams.add(inputJson);
- updateOperatorGraphJson((MessageStreamImpl) stream, inputJson, opGraph);
+ inputJson.streamId = streamSpec.getId();
+ Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs();
+ inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
+
+ updateOperatorGraphJson((MessageStreamImpl) stream, opGraph);
+
+ for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) {
+ List<Integer> sortedIds = new ArrayList<>(entry.getValue());
+ Collections.sort(sortedIds);
+ String canonicalId = Joiner.on(',').join(sortedIds);
+ sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId));
+ }
+ });
+
+ opGraph.outputStreams = new ArrayList<>();
+ jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+ StreamJson outputJson = new StreamJson();
+ outputJson.streamId = streamSpec.getId();
+ opGraph.outputStreams.add(outputJson);
});
return opGraph;
}
@@ -168,65 +193,30 @@ public class JobGraphJsonGenerator {
/**
* Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
* @param messageStream input
- * @param parent parent node in the traveral
* @param opGraph operator graph to build
*/
- private void updateOperatorGraphJson(MessageStreamImpl messageStream, Traversable parent, OperatorGraphJson opGraph) {
+ private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) {
Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
specs.forEach(opSpec -> {
- parent.nextOperatorIds.add(opSpec.getOpId());
+ opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec));
- OperatorJson opJson = getOrCreateOperatorJson(opSpec, opGraph);
- if (opSpec instanceof SinkOperatorSpec) {
- opJson.outputStreamId = ((SinkOperatorSpec) opSpec).getOutputStream().getStreamSpec().getId();
- } else if (opSpec.getNextStream() != null) {
- updateOperatorGraphJson(opSpec.getNextStream(), opJson, opGraph);
+ if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
+ outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId());
}
- });
- }
-
- /**
- * Get or create the JSON POJO for an operator.
- * @param opSpec {@link OperatorSpec}
- * @param opGraph {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
- * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorJson}
- */
- private OperatorJson getOrCreateOperatorJson(OperatorSpec opSpec, OperatorGraphJson opGraph) {
- Map<Integer, OperatorJson> operators = opGraph.operators;
- OperatorJson opJson = operators.get(opSpec.getOpId());
- if (opJson == null) {
- opJson = new OperatorJson();
- opJson.opCode = opSpec.getOpCode().name();
- opJson.opId = opSpec.getOpId();
- operators.put(opSpec.getOpId(), opJson);
- }
- if (opSpec instanceof PartialJoinOperatorSpec) {
- // every join will have two partial join operators
- // we will choose one of them in order to consolidate the inputs
- // the first one who registered with the outputStreamToJoinSpec will win
- MessageStream output = opSpec.getNextStream();
- OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
- if (joinSpec == null) {
- joinSpec = opSpec;
- outputStreamToJoinSpec.put(output, joinSpec);
- } else if (joinSpec != opSpec) {
- OperatorJson joinNode = operators.get(joinSpec.getOpId());
- joinNode.pairedOpId = opJson.opId;
- opJson.pairedOpId = joinNode.opId;
- }
- }
-
- return opJson;
+ if (opSpec.getNextStream() != null) {
+ updateOperatorGraphJson(opSpec.getNextStream(), opGraph);
+ }
+ });
}
/**
* Get or create the JSON POJO for a {@link StreamEdge}
* @param edge {@link StreamEdge}
* @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
- * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
+ * @return JSON representation of the {@link StreamEdge}
*/
- private StreamEdgeJson getOrCreateStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) {
+ private StreamEdgeJson buildStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) {
String streamId = edge.getStreamSpec().getId();
StreamEdgeJson edgeJson = streamEdges.get(streamId);
if (edgeJson == null) {
@@ -237,6 +227,19 @@ public class JobGraphJsonGenerator {
streamSpecJson.physicalName = edge.getStreamSpec().getPhysicalName();
streamSpecJson.partitionCount = edge.getPartitionCount();
edgeJson.streamSpec = streamSpecJson;
+
+ List<String> sourceJobs = new ArrayList<>();
+ edge.getSourceNodes().forEach(jobNode -> {
+ sourceJobs.add(jobNode.getJobName());
+ });
+ edgeJson.sourceJobs = sourceJobs;
+
+ List<String> targetJobs = new ArrayList<>();
+ edge.getTargetNodes().forEach(jobNode -> {
+ targetJobs.add(jobNode.getJobName());
+ });
+ edgeJson.targetJobs = targetJobs;
+
streamEdges.put(streamId, edgeJson);
}
return edgeJson;
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index e19c9ca..0484cf9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
public class JobNode {
private static final Logger log = LoggerFactory.getLogger(JobNode.class);
private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
+ private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
private final String jobName;
private final String jobId;
@@ -92,7 +93,12 @@ public class JobNode {
return outEdges;
}
- public JobConfig generateConfig() {
+ /**
+ * Generate the configs for a job
+ * @param executionPlanJson JSON representation of the execution plan
+ * @return config of the job
+ */
+ public JobConfig generateConfig(String executionPlanJson) {
Map<String, String> configs = new HashMap<>();
configs.put(JobConfig.JOB_NAME(), jobName);
@@ -100,6 +106,8 @@ public class JobNode {
configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
log.info("Job {} has generated configs {}", jobName, configs);
+ configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
+
String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
// TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 69a41db..b9adeed 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -81,16 +81,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
- OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(
- filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+ OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, new MessageStreamImpl<>(this.graph),
+ this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
return op.getNextStream();
}
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
- OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(
- flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+ OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, new MessageStreamImpl<>(this.graph),
+ this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
return op.getNextStream();
}
@@ -103,15 +103,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
- SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
- (OutputStreamInternal<K, V, M>) outputStream, this.graph.getNextOpId());
+ SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal<K, V, M>) outputStream,
+ this.graph.getNextOpId());
this.registeredOperatorSpecs.add(op);
}
@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
- OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec(
- (WindowInternal<M, K, WV>) window, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
+ OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
+ new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
this.registeredOperatorSpecs.add(wndOp);
return wndOp.getNextStream();
}
@@ -175,9 +175,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
- ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs
- .add(OperatorSpecs.createPartialJoinOperatorSpec(
- otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
+ ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs
+ .createPartialJoinOperatorSpec(otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream,
+ this.graph.getNextOpId()));
return nextStream;
}
@@ -187,8 +187,11 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
otherStreams.add(this);
- otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
- add(OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId())));
+ otherStreams.forEach(other -> {
+ OperatorSpec mergeOperatorSepc =
+ OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
+ ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(mergeOperatorSepc);
+ });
return nextStream;
}
@@ -213,5 +216,4 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
return Collections.unmodifiableSet(this.registeredOperatorSpecs);
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index 0f18e97..059b567 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -62,6 +62,11 @@ public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
public int getOpId() {
return -1;
}
+
+ @Override
+ public String getSourceLocation() {
+ return "";
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index cc3c4ab..3ea52ca 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -63,6 +63,12 @@ public interface OperatorSpec<OM> {
int getOpId();
/**
+ * Return the user source code location that creates the operator
+ * @return source location
+ */
+ String getSourceLocation();
+
+ /**
* Get the name for this operator based on its opCode and opId.
* @return the name for this operator
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index e85626f..92b4170 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.util.OperatorJsonUtils;
/**
@@ -38,6 +39,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
private final long ttlMs;
private final MessageStreamImpl<RM> nextStream;
private final int opId;
+ private final String sourceLocation;
/**
* Default constructor for a {@link PartialJoinOperatorSpec}.
@@ -58,6 +60,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
this.ttlMs = ttlMs;
this.nextStream = nextStream;
this.opId = opId;
+ this.sourceLocation = OperatorJsonUtils.getSourceLocation();
}
@Override
@@ -86,4 +89,9 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
public int getOpId() {
return this.opId;
}
+
+ @Override
+ public String getSourceLocation() {
+ return sourceLocation;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 0d135d3..afdd6b9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.spec;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.operators.util.OperatorJsonUtils;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
@@ -39,6 +40,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
private OutputStreamInternal<?, ?, M> outputStream; // may be null
private final OperatorSpec.OpCode opCode;
private final int opId;
+ private final String sourceLocation;
/**
* Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
@@ -54,6 +56,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
this.sinkFn = sinkFn;
this.opCode = opCode;
this.opId = opId;
+ this.sourceLocation = OperatorJsonUtils.getSourceLocation();
}
/**
@@ -99,6 +102,11 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
return this.opId;
}
+ @Override
+ public String getSourceLocation() {
+ return sourceLocation;
+ }
+
/**
* Creates a {@link SinkFunction} to send messages to the provided {@code output}.
* @param outputStream the {@link OutputStreamInternal} to send messages to
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 204e566..c53efae 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.util.OperatorJsonUtils;
/**
@@ -34,6 +35,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
private final MessageStreamImpl<OM> nextStream;
private final OperatorSpec.OpCode opCode;
private final int opId;
+ private final String sourceLocation;
/**
* Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
@@ -49,6 +51,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
this.nextStream = nextStream;
this.opCode = opCode;
this.opId = opId;
+ this.sourceLocation = OperatorJsonUtils.getSourceLocation();
}
@Override
@@ -69,4 +72,9 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
public int getOpId() {
return this.opId;
}
+
+ @Override
+ public String getSourceLocation() {
+ return sourceLocation;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 73b17b5..7ea07f6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,6 +20,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.util.OperatorJsonUtils;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
@@ -36,6 +37,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
private final WindowInternal<M, WK, WV> window;
private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
private final int opId;
+ private final String sourceLocation;
/**
* Constructor for {@link WindowOperatorSpec}.
@@ -48,6 +50,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
this.nextStream = nextStream;
this.window = window;
this.opId = opId;
+ this.sourceLocation = OperatorJsonUtils.getSourceLocation();
}
@Override
@@ -68,4 +71,9 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
public int getOpId() {
return this.opId;
}
+
+ @Override
+ public String getSourceLocation() {
+ return sourceLocation;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
new file mode 100644
index 0000000..b52fbc3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OperatorJsonUtils {
+ private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class);
+
+ private static final String OP_CODE = "opCode";
+ private static final String OP_ID = "opId";
+ private static final String SOURCE_LOCATION = "sourceLocation";
+ private static final String NEXT_OPERATOR_IDS = "nextOperatorIds";
+ private static final String OUTPUT_STREAM_ID = "outputStreamId";
+ private static final String TTL_MS = "ttlMs";
+
+ /**
+ * Format the operator properties into a map
+ * @param spec a {@link OperatorSpec} instance
+ * @return map of the operator properties
+ */
+ public static Map<String, Object> operatorToMap(OperatorSpec spec) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(OP_CODE, spec.getOpCode().name());
+ map.put(OP_ID, spec.getOpId());
+ map.put(SOURCE_LOCATION, spec.getSourceLocation());
+
+ if (spec.getNextStream() != null) {
+ Collection<OperatorSpec> nextOperators = spec.getNextStream().getRegisteredOperatorSpecs();
+ map.put(NEXT_OPERATOR_IDS, nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
+ } else {
+ map.put(NEXT_OPERATOR_IDS, Collections.emptySet());
+ }
+
+ if (spec instanceof SinkOperatorSpec) {
+ map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId());
+ }
+
+ if (spec instanceof PartialJoinOperatorSpec) {
+ map.put(TTL_MS, ((PartialJoinOperatorSpec) spec).getTtlMs());
+ }
+
+ return map;
+ }
+
+ /**
+ * Return the location of source code that creates the operator.
+ * This function is invoked in the constructor of each operator.
+ * @return formatted source location including file and line number
+ */
+ public static String getSourceLocation() {
+ // The stack trace looks like:
+ // [0] Thread.getStackTrace()
+ // [1] OperatorJsonUtils.getSourceLocation()
+ // [2] SomeOperator.<init>()
+ // [3] OperatorSpecs.createSomeOperator()
+ // [4] MessageStreamImpl.someOperator()
+ // [5] User code that calls [2]
+ // we are only interested in [5] here
+ StackTraceElement location = Thread.currentThread().getStackTrace()[5];
+ return String.format("%s:%s", location.getFileName(), location.getLineNumber());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 692dc38..3c7c83d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -18,22 +18,28 @@
*/
package org.apache.samza.runtime;
+import java.io.File;
+import java.io.PrintWriter;
import java.util.Map;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.execution.ExecutionPlanner;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Defines common, core behavior for implementations of the {@link ApplicationRunner} API
*/
public abstract class AbstractApplicationRunner extends ApplicationRunner {
+ private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
private final StreamManager streamManager;
private final ExecutionPlanner planner;
@@ -106,4 +112,25 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
final StreamManager getStreamManager() {
return streamManager;
}
+
+ /**
+ * Write the execution plan JSON to a file
+ * @param planJson JSON representation of the plan
+ */
+ final void writePlanJsonFile(String planJson) {
+ try {
+ String content = "plan='" + planJson + "'";
+ String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+ if (planPath != null && !planPath.isEmpty()) {
+ // Write the plan json to plan path
+ File file = new File(planPath + "/plan.json");
+ file.setReadable(true, false);
+ PrintWriter writer = new PrintWriter(file, "UTF-8");
+ writer.println(content);
+ writer.close();
+ }
+ } catch (Exception e) {
+ log.warn("Failed to write execution plan json to file", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5e83c3c..bff0f1c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -110,6 +110,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
try {
// 1. initialize and plan
ExecutionPlan plan = getExecutionPlan(app);
+ writePlanJsonFile(plan.getPlanAsJson());
// 2. create the necessary streams
createStreams(plan.getIntermediateStreams());
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 38eb195..d5f6e21 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -50,6 +50,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
try {
// 1. initialize and plan
ExecutionPlan plan = getExecutionPlan(app);
+ writePlanJsonFile(plan.getPlanAsJson());
// 2. create the necessary streams
getStreamManager().createStreams(plan.getIntermediateStreams());
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1397ed5..3c0f320 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -53,6 +53,11 @@ object ShellCommandConfig {
*/
val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
+ /**
+ * The directory path that contains the execution plan
+ */
+ val EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR"
+
val COMMAND_SHELL_EXECUTE = "task.execute"
val TASK_JVM_OPTS = "task.opts"
val TASK_JAVA_HOME = "task.java.home"
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index b7f952a..5366dc3 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -220,11 +220,11 @@ public class TestExecutionPlanner {
JobGraph jobGraph = planner.createJobGraph(streamGraph);
ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
- assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
- assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
- assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
- assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
- assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
+ assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 64);
+ assertTrue(jobGraph.getOrCreateStreamEdge(input2).getPartitionCount() == 16);
+ assertTrue(jobGraph.getOrCreateStreamEdge(input3).getPartitionCount() == 32);
+ assertTrue(jobGraph.getOrCreateStreamEdge(output1).getPartitionCount() == 8);
+ assertTrue(jobGraph.getOrCreateStreamEdge(output2).getPartitionCount() == 16);
jobGraph.getIntermediateStreamEdges().forEach(edge -> {
assertTrue(edge.getPartitionCount() == -1);
@@ -264,11 +264,10 @@ public class TestExecutionPlanner {
}
@Test
- public void testCalculateIntStreamPartitions() {
+ public void testCalculateIntStreamPartitions() throws Exception {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
StreamGraphImpl streamGraph = createSimpleGraph();
- JobGraph jobGraph = planner.createJobGraph(streamGraph);
- planner.calculatePartitions(streamGraph, jobGraph);
+ JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 4a4498c..bf131ce 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -59,14 +59,14 @@ public class TestJobGraph {
private void createGraph1() {
graph1 = new JobGraph(null);
- JobNode n2 = graph1.getOrCreateNode("2", "1", null);
- JobNode n3 = graph1.getOrCreateNode("3", "1", null);
- JobNode n5 = graph1.getOrCreateNode("5", "1", null);
- JobNode n7 = graph1.getOrCreateNode("7", "1", null);
- JobNode n8 = graph1.getOrCreateNode("8", "1", null);
- JobNode n9 = graph1.getOrCreateNode("9", "1", null);
- JobNode n10 = graph1.getOrCreateNode("10", "1", null);
- JobNode n11 = graph1.getOrCreateNode("11", "1", null);
+ JobNode n2 = graph1.getOrCreateJobNode("2", "1", null);
+ JobNode n3 = graph1.getOrCreateJobNode("3", "1", null);
+ JobNode n5 = graph1.getOrCreateJobNode("5", "1", null);
+ JobNode n7 = graph1.getOrCreateJobNode("7", "1", null);
+ JobNode n8 = graph1.getOrCreateJobNode("8", "1", null);
+ JobNode n9 = graph1.getOrCreateJobNode("9", "1", null);
+ JobNode n10 = graph1.getOrCreateJobNode("10", "1", null);
+ JobNode n11 = graph1.getOrCreateJobNode("11", "1", null);
graph1.addSource(genStream(), n5);
graph1.addSource(genStream(), n7);
@@ -92,13 +92,13 @@ public class TestJobGraph {
private void createGraph2() {
graph2 = new JobGraph(null);
- JobNode n1 = graph2.getOrCreateNode("1", "1", null);
- JobNode n2 = graph2.getOrCreateNode("2", "1", null);
- JobNode n3 = graph2.getOrCreateNode("3", "1", null);
- JobNode n4 = graph2.getOrCreateNode("4", "1", null);
- JobNode n5 = graph2.getOrCreateNode("5", "1", null);
- JobNode n6 = graph2.getOrCreateNode("6", "1", null);
- JobNode n7 = graph2.getOrCreateNode("7", "1", null);
+ JobNode n1 = graph2.getOrCreateJobNode("1", "1", null);
+ JobNode n2 = graph2.getOrCreateJobNode("2", "1", null);
+ JobNode n3 = graph2.getOrCreateJobNode("3", "1", null);
+ JobNode n4 = graph2.getOrCreateJobNode("4", "1", null);
+ JobNode n5 = graph2.getOrCreateJobNode("5", "1", null);
+ JobNode n6 = graph2.getOrCreateJobNode("6", "1", null);
+ JobNode n7 = graph2.getOrCreateJobNode("7", "1", null);
graph2.addSource(genStream(), n1);
graph2.addIntermediateStream(genStream(), n1, n2);
@@ -119,8 +119,8 @@ public class TestJobGraph {
private void createGraph3() {
graph3 = new JobGraph(null);
- JobNode n1 = graph3.getOrCreateNode("1", "1", null);
- JobNode n2 = graph3.getOrCreateNode("2", "1", null);
+ JobNode n1 = graph3.getOrCreateJobNode("1", "1", null);
+ JobNode n2 = graph3.getOrCreateJobNode("2", "1", null);
graph3.addSource(genStream(), n1);
graph3.addIntermediateStream(genStream(), n1, n1);
@@ -135,7 +135,7 @@ public class TestJobGraph {
private void createGraph4() {
graph4 = new JobGraph(null);
- JobNode n1 = graph4.getOrCreateNode("1", "1", null);
+ JobNode n1 = graph4.getOrCreateJobNode("1", "1", null);
graph4.addSource(genStream(), n1);
graph4.addIntermediateStream(genStream(), n1, n1);
@@ -160,9 +160,9 @@ public class TestJobGraph {
* s3 -> 2
* |-> 3
*/
- JobNode n1 = graph.getOrCreateNode("1", "1", null);
- JobNode n2 = graph.getOrCreateNode("2", "1", null);
- JobNode n3 = graph.getOrCreateNode("3", "1", null);
+ JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
+ JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
+ JobNode n3 = graph.getOrCreateJobNode("3", "1", null);
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
@@ -173,16 +173,16 @@ public class TestJobGraph {
assertTrue(graph.getSources().size() == 3);
- assertTrue(graph.getOrCreateNode("1", "1", null).getInEdges().size() == 2);
- assertTrue(graph.getOrCreateNode("2", "1", null).getInEdges().size() == 1);
- assertTrue(graph.getOrCreateNode("3", "1", null).getInEdges().size() == 1);
+ assertTrue(graph.getOrCreateJobNode("1", "1", null).getInEdges().size() == 2);
+ assertTrue(graph.getOrCreateJobNode("2", "1", null).getInEdges().size() == 1);
+ assertTrue(graph.getOrCreateJobNode("3", "1", null).getInEdges().size() == 1);
- assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
+ assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1);
+ assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 1);
+ assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 0);
+ assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 2);
}
@Test
@@ -193,8 +193,8 @@ public class TestJobGraph {
* 2 -> s3
*/
JobGraph graph = new JobGraph(null);
- JobNode n1 = graph.getOrCreateNode("1", "1", null);
- JobNode n2 = graph.getOrCreateNode("2", "1", null);
+ JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
+ JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
@@ -203,15 +203,15 @@ public class TestJobGraph {
graph.addSink(s3, n2);
assertTrue(graph.getSinks().size() == 3);
- assertTrue(graph.getOrCreateNode("1", "1", null).getOutEdges().size() == 1);
- assertTrue(graph.getOrCreateNode("2", "1", null).getOutEdges().size() == 2);
-
- assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
- assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
- assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
+ assertTrue(graph.getOrCreateJobNode("1", "1", null).getOutEdges().size() == 1);
+ assertTrue(graph.getOrCreateJobNode("2", "1", null).getOutEdges().size() == 2);
+
+ assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0);
+ assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 0);
+ assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 1);
+ assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 0);
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index c4ab922..2681f9c 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -125,6 +125,8 @@ public class TestJobGraphJsonGenerator {
JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
- assertTrue(nodes.streams.size() == 7);
+ assertTrue(nodes.sourceStreams.size() == 3);
+ assertTrue(nodes.sinkStreams.size() == 2);
+ assertTrue(nodes.intermediateStreams.size() == 2);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index bd18f0b..99bf854 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -217,6 +217,11 @@ public class TestOperatorImpl {
public int getOpId() {
return -1;
}
+
+ @Override
+ public String getSourceLocation() {
+ return "";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index d227206..cccafaf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -57,8 +57,8 @@ public class TestOperatorSpecs {
@Test
public void testCreateStreamOperator() {
FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
- } };
+ this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
+ } };
MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
StreamOperatorSpec<Object, TestMessageEnvelope> streamOp =
OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
@@ -78,7 +78,7 @@ public class TestOperatorSpecs {
public void testCreateSinkOperator() {
SystemStream testStream = new SystemStream("test-sys", "test-stream");
SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
- TaskCoordinator taskCoordinator) -> {
+ TaskCoordinator taskCoordinator) -> {
messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage()));
};
SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/assembly/src.xml b/samza-shell/src/main/assembly/src.xml
index 5173fdf..cc15420 100644
--- a/samza-shell/src/main/assembly/src.xml
+++ b/samza-shell/src/main/assembly/src.xml
@@ -27,5 +27,13 @@
<include>*</include>
</includes>
</fileSet>
+ <fileSet>
+ <outputDirectory>visualizer</outputDirectory>
+ <directory>${basedir}/src/main/visualizer</directory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>*</include>
+ </includes>
+ </fileSet>
</fileSets>
</assembly>
http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/bash/run-app.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-app.sh b/samza-shell/src/main/bash/run-app.sh
index 3e43463..3880e3c 100644
--- a/samza-shell/src/main/bash/run-app.sh
+++ b/samza-shell/src/main/bash/run-app.sh
@@ -16,6 +16,15 @@
# specific language governing permissions and limitations
# under the License.
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
exec $(dirname $0)/run-class.sh org.apache.samza.runtime.ApplicationRunnerMain "$@"