You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:54 UTC

[11/18] git commit: [streaming] Added checked exception to execution

[streaming] Added checked exception to execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/48d8ed70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/48d8ed70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/48d8ed70

Branch: refs/heads/master
Commit: 48d8ed703e9d8a25f2143514e8f9cad66dc2593d
Parents: 03a28cb
Author: ghermann <re...@gmail.com>
Authored: Tue Sep 16 13:06:49 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTopology.java         |  2 +-
 .../connectors/twitter/TwitterLocal.java        |  2 +-
 .../connectors/twitter/TwitterStreaming.java    |  2 +-
 .../api/environment/LocalStreamEnvironment.java |  4 +--
 .../environment/StreamExecutionEnvironment.java |  3 ++-
 .../flink/streaming/util/ClusterUtil.java       | 11 ++++-----
 .../streamcomponent/StreamComponentTest.java    | 26 +++++++++-----------
 .../examples/basictopology/BasicTopology.java   |  2 +-
 .../examples/cellinfo/CellInfoLocal.java        |  2 +-
 .../streaming/examples/join/JoinLocal.java      |  2 +-
 .../ml/IncrementalLearningSkeleton.java         |  2 +-
 .../examples/window/join/WindowJoinLocal.java   |  2 +-
 .../examples/wordcount/WordCountLocal.java      |  2 +-
 13 files changed, 29 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 64ea810..998a395 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -90,7 +90,7 @@ public class KafkaTopology {
 
 	private static final int SOURCE_PARALELISM = 1;
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 465a500..6f739b3 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -58,7 +58,7 @@ public class TwitterLocal {
 
 	}
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		String path = new String();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 5927ce4..540895c 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -72,7 +72,7 @@ public class TwitterStreaming {
 		}
 	}
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		String path = new String();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 828a566..f39a20a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -26,11 +26,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * 
 	 */
 	@Override
-	public void execute() {
+	public void execute() throws Exception {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
 	}
 
-	public void executeTest(long memorySize) {
+	public void executeTest(long memorySize) throws Exception {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
 				memorySize);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6187c99..ad379b3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -454,8 +454,9 @@ public abstract class StreamExecutionEnvironment {
 	 * <p>
 	 * The program execution will be logged and displayed with a generated
 	 * default name.
+	 * @throws Exception 
 	 **/
-	public abstract void execute();
+	public abstract void execute() throws Exception;
 
 	/**
 	 * Getter of the {@link JobGraphBuilder} of the streaming job.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 0853ec7..ec73aef 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -42,7 +42,7 @@ public class ClusterUtil {
 	 * @param memorySize
 	 *            memorySize
 	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers, long memorySize) {
+	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers, long memorySize) throws Exception  {
 
 		Configuration configuration = jobGraph.getJobConfiguration();
 
@@ -59,15 +59,14 @@ public class ClusterUtil {
 			Client client = new Client(new InetSocketAddress("localhost",
 					exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
 			client.run(jobGraph, true);
-
 		} catch (ProgramInvocationException e) {
 			if (e.getMessage().contains("GraphConversionException")) {
-				throw new RuntimeException(CANNOT_EXECUTE_EMPTY_JOB, e);
+				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
 			} else {
-				throw new RuntimeException(e.getMessage(), e);
+				throw e;
 			}
 		} catch (Exception e) {
-			throw new RuntimeException(e.getMessage(), e);
+			throw e;
 		} finally {
 			try {
 				exec.stop();
@@ -76,7 +75,7 @@ public class ClusterUtil {
 		}
 	}
 
-	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
+	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) throws Exception {
 		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index efb1e94..9586253 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -34,9 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.ClusterUtil;
 import org.apache.flink.util.Collector;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class StreamComponentTest {
@@ -83,7 +81,6 @@ public class StreamComponentTest {
 	private static final int SOURCE_PARALELISM = 1;
 	private static final long MEMORYSIZE = 32;
 
-	@Ignore
 	@Test
 	public void wrongJobGraph() {
 		LocalStreamEnvironment env = StreamExecutionEnvironment
@@ -92,8 +89,7 @@ public class StreamComponentTest {
 		try {
 			env.execute();
 			fail();
-		} catch (RuntimeException e) {
-			assertEquals(e.getMessage(), ClusterUtil.CANNOT_EXECUTE_EMPTY_JOB);
+		} catch (Exception e) {
 		}
 
 		env.fromCollection(Arrays.asList("a", "b"));
@@ -101,8 +97,7 @@ public class StreamComponentTest {
 		try {
 			env.execute();
 			fail();
-		} catch (RuntimeException e) {
-			System.out.println(e.getMessage());
+		} catch (Exception e) {
 		}
 
 		try {
@@ -157,6 +152,7 @@ public class StreamComponentTest {
 	}
 
 	static HashSet<String> resultSet;
+
 	private static class SetSink implements SinkFunction<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -165,33 +161,33 @@ public class StreamComponentTest {
 			resultSet.add(value);
 		}
 	}
-	
+
 	@Test
-	public void coTest() {
+	public void coTest() throws Exception {
 		LocalStreamEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(SOURCE_PARALELISM);
 
 		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
 		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-		
+
 		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-		
+
 		resultSet = new HashSet<String>();
 		env.execute();
-		
-		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3"));
+
+		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+				"2", "3"));
 		assertEquals(expectedSet, resultSet);
 	}
 
 	@Test
-	public void runStream() {
+	public void runStream() throws Exception {
 		LocalStreamEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(SOURCE_PARALELISM);
 
 		env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
 
 		env.executeTest(MEMORYSIZE);
-
 		assertEquals(10, data.keySet().size());
 
 		for (Integer k : data.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 7d8a49c..8f26be5 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -52,7 +52,7 @@ public class BasicTopology {
 	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALLELISM = 1;
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
index 69b933c..5338e9d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
@@ -107,7 +107,7 @@ public class CellInfoLocal {
 	}
 
 	// Example for connecting data streams
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
 				PARALLELISM).setBufferTimeout(100);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
index 472c8c3..05398f4 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
@@ -30,7 +30,7 @@ public class JoinLocal {
 	// This example will join two streams. One which emits people's grades and
 	// one which emits people's salaries.
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
 				PARALLELISM).setBufferTimeout(100);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index a433fd0..3218c47 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -117,7 +117,7 @@ public class IncrementalLearningSkeleton {
 	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALLELISM = 1;
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
 				PARALLELISM).setBufferTimeout(1000);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index b298f3f..cd8f71c 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -29,7 +29,7 @@ public class WindowJoinLocal {
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
 				PARALLELISM).setBufferTimeout(100);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index f78cd1a..532ec53 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -43,7 +43,7 @@ public class WordCountLocal {
 		}
 	}
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 
 		TestDataUtil.downloadIfNotExists("hamlet.txt");
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);