You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:44 UTC

[11/12] git commit: [FLINK-1122] [streaming] Job Execution with user specified name

[FLINK-1122] [streaming] Job Execution with user specified name


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/076223cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/076223cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/076223cb

Branch: refs/heads/master
Commit: 076223cb9540c973eaba7d24e50c0e1f3eb80308
Parents: 70464bb
Author: mbalassi <ba...@gmail.com>
Authored: Wed Sep 24 21:05:58 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 21:07:13 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 53 +++++++++++---------
 .../api/environment/LocalStreamEnvironment.java | 18 ++++++-
 .../environment/RemoteStreamEnvironment.java    | 20 +++++++-
 .../environment/StreamExecutionEnvironment.java | 30 ++++++++---
 .../streaming/examples/wordcount/WordCount.java |  2 +-
 5 files changed, 86 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 3377ee0..e06fde3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -49,7 +49,8 @@ import org.slf4j.LoggerFactory;
 public class JobGraphBuilder {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
-	private final JobGraph jobGraph;
+	private final static String DEAFULT_JOB_NAME = "Streaming Job";
+	private JobGraph jobGraph;
 
 	// Graph attributes
 	private Map<String, AbstractJobVertex> streamVertices;
@@ -87,9 +88,7 @@ public class JobGraphBuilder {
 	 * @param jobGraphName
 	 *            Name of the JobGraph
 	 */
-	public JobGraphBuilder(String jobGraphName) {
-
-		jobGraph = new JobGraph(jobGraphName);
+	public JobGraphBuilder() {
 
 		streamVertices = new HashMap<String, AbstractJobVertex>();
 		vertexParallelism = new HashMap<String, Integer>();
@@ -157,8 +156,8 @@ public class JobGraphBuilder {
 	 */
 	public <IN, OUT> void addStreamVertex(String vertexName,
 			StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
-			TypeWrapper<?> outTypeWrapper, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			TypeWrapper<?> outTypeWrapper, String operatorName, byte[] serializedFunction,
+			int parallelism) {
 
 		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
 				serializedFunction, parallelism);
@@ -240,9 +239,8 @@ public class JobGraphBuilder {
 	}
 
 	public <IN1, IN2, OUT> void addCoTask(String vertexName,
-			CoInvokable<IN1, IN2, OUT> taskInvokableObject,
-			TypeWrapper<?> in1TypeWrapper, TypeWrapper<?> in2TypeWrapper,
-			TypeWrapper<?> outTypeWrapper, String operatorName,
+			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeWrapper<?> in1TypeWrapper,
+			TypeWrapper<?> in2TypeWrapper, TypeWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
 		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
@@ -290,9 +288,8 @@ public class JobGraphBuilder {
 		iterationTailCount.put(vertexName, 0);
 	}
 
-	private void addTypeWrappers(String vertexName, TypeWrapper<?> in1,
-			TypeWrapper<?> in2, TypeWrapper<?> out1,
-			TypeWrapper<?> out2) {
+	private void addTypeWrappers(String vertexName, TypeWrapper<?> in1, TypeWrapper<?> in2,
+			TypeWrapper<?> out1, TypeWrapper<?> out2) {
 		typeWrapperIn1.put(vertexName, in1);
 		typeWrapperIn2.put(vertexName, in2);
 		typeWrapperOut1.put(vertexName, out1);
@@ -539,11 +536,29 @@ public class JobGraphBuilder {
 	}
 
 	/**
+	 * Gets the assembled {@link JobGraph} and adds a default name for it.
+	 */
+	public JobGraph getJobGraph() {
+		return getJobGraph(DEAFULT_JOB_NAME);
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} and adds a user specified name for
+	 * it.
+	 * 
+	 * @param jobGraphName name of the jobGraph
+	 */
+	public JobGraph getJobGraph(String jobGraphName) {
+		jobGraph = new JobGraph(jobGraphName);
+		buildJobGraph();
+		return jobGraph;
+	}
+
+	/**
 	 * Builds the {@link JobGraph} from the vertices with the edges and settings
 	 * provided.
 	 */
-	private void buildGraph() {
-
+	private void buildJobGraph() {
 		for (String vertexName : outEdgeList.keySet()) {
 			createVertex(vertexName);
 		}
@@ -573,14 +588,4 @@ public class JobGraphBuilder {
 		setNumberOfJobOutputs();
 	}
 
-	/**
-	 * Builds and returns the JobGraph
-	 * 
-	 * @return JobGraph object
-	 */
-	public JobGraph getJobGraph() {
-		buildGraph();
-		return jobGraph;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4f259d4..94e0891 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -22,16 +22,30 @@ import org.apache.flink.streaming.util.ClusterUtil;
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	/**
-	 * Executes the JobGraph of the on a mini cluster of CLusterUtil.
-	 * 
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
+	 * default name.
 	 */
 	@Override
 	public void execute() throws Exception {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
 	}
 
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+	 * specified name.
+	 * 
+	 * @param jobName
+	 *            name of the job
+	 */
+	@Override
+	public void execute(String jobName) throws Exception {
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+				getExecutionParallelism());
+	}
+
 	public void executeTest(long memorySize) throws Exception {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
 				memorySize);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 0582668..864e18d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -70,12 +70,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public void execute() {
+		
+		JobGraph jobGraph = jobGraphBuilder.getJobGraph();
+		executeRemotely(jobGraph);
+	}
+	
+	@Override
+	public void execute(String jobName) {
+		
+		JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+		executeRemotely(jobGraph);
+	}
+
+	/**
+	 * Executes the remote job.
+	 * 
+	 * @param jobGraph jobGraph to execute
+	 */
+	private void executeRemotely(JobGraph jobGraph) {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		JobGraph jobGraph = jobGraphBuilder.getJobGraph();
-
 		for (int i = 0; i < jarFiles.length; i++) {
 			File file = new File(jarFiles[i]);
 			try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e7a68d3..4d34217 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -71,7 +71,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Constructor for creating StreamExecutionEnvironment
 	 */
 	protected StreamExecutionEnvironment() {
-		jobGraphBuilder = new JobGraphBuilder("jobGraph");
+		jobGraphBuilder = new JobGraphBuilder();
 	}
 
 	public int getExecutionParallelism() {
@@ -230,8 +230,9 @@ public abstract class StreamExecutionEnvironment {
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
-					null, outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
+			jobGraphBuilder.addStreamVertex(returnStream.getId(),
+					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
 		}
@@ -259,8 +260,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
-				.next());
+		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
 				outTypeWrapper);
 
@@ -311,9 +311,9 @@ public abstract class StreamExecutionEnvironment {
 				outTypeWrapper);
 
 		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
-					null, outTypeWrapper, "source", SerializationUtils.serialize(function),
-					parallelism);
+			jobGraphBuilder.addStreamVertex(returnStream.getId(),
+					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					SerializationUtils.serialize(function), parallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}
@@ -461,6 +461,20 @@ public abstract class StreamExecutionEnvironment {
 	public abstract void execute() throws Exception;
 
 	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p>
+	 * The program execution will be logged and displayed with the provided
+	 * name
+	 * 
+	 * @param jobName Desired name of the job
+	 * 
+	 * @throws Exception
+	 **/
+	public abstract void execute(String jobName) throws Exception;
+
+	/**
 	 * Getter of the {@link JobGraphBuilder} of the streaming job.
 	 * 
 	 * @return jobgraph

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/076223cb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index 3be0c89..e07dfe5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -80,7 +80,7 @@ public class WordCount {
 		}
 
 		// execute program
-		env.execute();
+		env.execute("Streaming WordCount");
 	}
 
 	// *************************************************************************