You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:44 UTC
[11/12] git commit: [FLINK-1122] [streaming] Job Execution with user
specified name
[FLINK-1122] [streaming] Job Execution with user specified name
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/076223cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/076223cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/076223cb
Branch: refs/heads/master
Commit: 076223cb9540c973eaba7d24e50c0e1f3eb80308
Parents: 70464bb
Author: mbalassi <ba...@gmail.com>
Authored: Wed Sep 24 21:05:58 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 21:07:13 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 53 +++++++++++---------
.../api/environment/LocalStreamEnvironment.java | 18 ++++++-
.../environment/RemoteStreamEnvironment.java | 20 +++++++-
.../environment/StreamExecutionEnvironment.java | 30 ++++++++---
.../streaming/examples/wordcount/WordCount.java | 2 +-
5 files changed, 86 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 3377ee0..e06fde3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -49,7 +49,8 @@ import org.slf4j.LoggerFactory;
public class JobGraphBuilder {
private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
- private final JobGraph jobGraph;
+ private final static String DEAFULT_JOB_NAME = "Streaming Job";
+ private JobGraph jobGraph;
// Graph attributes
private Map<String, AbstractJobVertex> streamVertices;
@@ -87,9 +88,7 @@ public class JobGraphBuilder {
* @param jobGraphName
* Name of the JobGraph
*/
- public JobGraphBuilder(String jobGraphName) {
-
- jobGraph = new JobGraph(jobGraphName);
+ public JobGraphBuilder() {
streamVertices = new HashMap<String, AbstractJobVertex>();
vertexParallelism = new HashMap<String, Integer>();
@@ -157,8 +156,8 @@ public class JobGraphBuilder {
*/
public <IN, OUT> void addStreamVertex(String vertexName,
StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
- TypeWrapper<?> outTypeWrapper, String operatorName,
- byte[] serializedFunction, int parallelism) {
+ TypeWrapper<?> outTypeWrapper, String operatorName, byte[] serializedFunction,
+ int parallelism) {
addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
serializedFunction, parallelism);
@@ -240,9 +239,8 @@ public class JobGraphBuilder {
}
public <IN1, IN2, OUT> void addCoTask(String vertexName,
- CoInvokable<IN1, IN2, OUT> taskInvokableObject,
- TypeWrapper<?> in1TypeWrapper, TypeWrapper<?> in2TypeWrapper,
- TypeWrapper<?> outTypeWrapper, String operatorName,
+ CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeWrapper<?> in1TypeWrapper,
+ TypeWrapper<?> in2TypeWrapper, TypeWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
@@ -290,9 +288,8 @@ public class JobGraphBuilder {
iterationTailCount.put(vertexName, 0);
}
- private void addTypeWrappers(String vertexName, TypeWrapper<?> in1,
- TypeWrapper<?> in2, TypeWrapper<?> out1,
- TypeWrapper<?> out2) {
+ private void addTypeWrappers(String vertexName, TypeWrapper<?> in1, TypeWrapper<?> in2,
+ TypeWrapper<?> out1, TypeWrapper<?> out2) {
typeWrapperIn1.put(vertexName, in1);
typeWrapperIn2.put(vertexName, in2);
typeWrapperOut1.put(vertexName, out1);
@@ -539,11 +536,29 @@ public class JobGraphBuilder {
}
/**
+ * Gets the assembled {@link JobGraph} and adds a default name for it.
+ */
+ public JobGraph getJobGraph() {
+ return getJobGraph(DEAFULT_JOB_NAME);
+ }
+
+ /**
+ * Gets the assembled {@link JobGraph} and adds a user specified name for
+ * it.
+ *
+ * @param jobGraphName name of the jobGraph
+ */
+ public JobGraph getJobGraph(String jobGraphName) {
+ jobGraph = new JobGraph(jobGraphName);
+ buildJobGraph();
+ return jobGraph;
+ }
+
+ /**
* Builds the {@link JobGraph} from the vertices with the edges and settings
* provided.
*/
- private void buildGraph() {
-
+ private void buildJobGraph() {
for (String vertexName : outEdgeList.keySet()) {
createVertex(vertexName);
}
@@ -573,14 +588,4 @@ public class JobGraphBuilder {
setNumberOfJobOutputs();
}
- /**
- * Builds and returns the JobGraph
- *
- * @return JobGraph object
- */
- public JobGraph getJobGraph() {
- buildGraph();
- return jobGraph;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4f259d4..94e0891 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -22,16 +22,30 @@ import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/**
- * Executes the JobGraph of the on a mini cluster of CLusterUtil.
- *
+ * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
+ * default name.
*/
@Override
public void execute() throws Exception {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
}
+ /**
+ * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+ * specified name.
+ *
+ * @param jobName
+ * name of the job
+ */
+ @Override
+ public void execute(String jobName) throws Exception {
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+ getExecutionParallelism());
+ }
+
public void executeTest(long memorySize) throws Exception {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
memorySize);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 0582668..864e18d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -70,12 +70,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Override
public void execute() {
+
+ JobGraph jobGraph = jobGraphBuilder.getJobGraph();
+ executeRemotely(jobGraph);
+ }
+
+ @Override
+ public void execute(String jobName) {
+
+ JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+ executeRemotely(jobGraph);
+ }
+
+ /**
+ * Executes the remote job.
+ *
+ * @param jobGraph jobGraph to execute
+ */
+ private void executeRemotely(JobGraph jobGraph) {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
- JobGraph jobGraph = jobGraphBuilder.getJobGraph();
-
for (int i = 0; i < jarFiles.length; i++) {
File file = new File(jarFiles[i]);
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e7a68d3..4d34217 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -71,7 +71,7 @@ public abstract class StreamExecutionEnvironment {
* Constructor for creating StreamExecutionEnvironment
*/
protected StreamExecutionEnvironment() {
- jobGraphBuilder = new JobGraphBuilder("jobGraph");
+ jobGraphBuilder = new JobGraphBuilder();
}
public int getExecutionParallelism() {
@@ -230,8 +230,9 @@ public abstract class StreamExecutionEnvironment {
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
- null, outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
+ jobGraphBuilder.addStreamVertex(returnStream.getId(),
+ new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+ SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
@@ -259,8 +260,7 @@ public abstract class StreamExecutionEnvironment {
throw new IllegalArgumentException("Collection must not be empty");
}
- TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
- .next());
+ TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
outTypeWrapper);
@@ -311,9 +311,9 @@ public abstract class StreamExecutionEnvironment {
outTypeWrapper);
try {
- jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
- null, outTypeWrapper, "source", SerializationUtils.serialize(function),
- parallelism);
+ jobGraphBuilder.addStreamVertex(returnStream.getId(),
+ new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+ SerializationUtils.serialize(function), parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
@@ -461,6 +461,20 @@ public abstract class StreamExecutionEnvironment {
public abstract void execute() throws Exception;
/**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p>
+ * The program execution will be logged and displayed with the provided
+ * name
+ *
+ * @param jobName Desired name of the job
+ *
+ * @throws Exception
+ **/
+ public abstract void execute(String jobName) throws Exception;
+
+ /**
* Getter of the {@link JobGraphBuilder} of the streaming job.
*
* @return jobgraph
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index 3be0c89..e07dfe5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -80,7 +80,7 @@ public class WordCount {
}
// execute program
- env.execute();
+ env.execute("Streaming WordCount");
}
// *************************************************************************