You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:54 UTC
[11/18] git commit: [streaming] Added checked exception to execution
[streaming] Added checked exception to execution
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/48d8ed70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/48d8ed70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/48d8ed70
Branch: refs/heads/master
Commit: 48d8ed703e9d8a25f2143514e8f9cad66dc2593d
Parents: 03a28cb
Author: ghermann <re...@gmail.com>
Authored: Tue Sep 16 13:06:49 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaTopology.java | 2 +-
.../connectors/twitter/TwitterLocal.java | 2 +-
.../connectors/twitter/TwitterStreaming.java | 2 +-
.../api/environment/LocalStreamEnvironment.java | 4 +--
.../environment/StreamExecutionEnvironment.java | 3 ++-
.../flink/streaming/util/ClusterUtil.java | 11 ++++-----
.../streamcomponent/StreamComponentTest.java | 26 +++++++++-----------
.../examples/basictopology/BasicTopology.java | 2 +-
.../examples/cellinfo/CellInfoLocal.java | 2 +-
.../streaming/examples/join/JoinLocal.java | 2 +-
.../ml/IncrementalLearningSkeleton.java | 2 +-
.../examples/window/join/WindowJoinLocal.java | 2 +-
.../examples/wordcount/WordCountLocal.java | 2 +-
13 files changed, 29 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 64ea810..998a395 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -90,7 +90,7 @@ public class KafkaTopology {
private static final int SOURCE_PARALELISM = 1;
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 465a500..6f739b3 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -58,7 +58,7 @@ public class TwitterLocal {
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
String path = new String();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 5927ce4..540895c 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -72,7 +72,7 @@ public class TwitterStreaming {
}
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
String path = new String();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 828a566..f39a20a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -26,11 +26,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*
*/
@Override
- public void execute() {
+ public void execute() throws Exception {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
}
- public void executeTest(long memorySize) {
+ public void executeTest(long memorySize) throws Exception {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
memorySize);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6187c99..ad379b3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -454,8 +454,9 @@ public abstract class StreamExecutionEnvironment {
* <p>
* The program execution will be logged and displayed with a generated
* default name.
+ * @throws Exception
**/
- public abstract void execute();
+ public abstract void execute() throws Exception;
/**
* Getter of the {@link JobGraphBuilder} of the streaming job.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 0853ec7..ec73aef 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -42,7 +42,7 @@ public class ClusterUtil {
* @param memorySize
* memorySize
*/
- public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers, long memorySize) {
+ public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers, long memorySize) throws Exception {
Configuration configuration = jobGraph.getJobConfiguration();
@@ -59,15 +59,14 @@ public class ClusterUtil {
Client client = new Client(new InetSocketAddress("localhost",
exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
client.run(jobGraph, true);
-
} catch (ProgramInvocationException e) {
if (e.getMessage().contains("GraphConversionException")) {
- throw new RuntimeException(CANNOT_EXECUTE_EMPTY_JOB, e);
+ throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
} else {
- throw new RuntimeException(e.getMessage(), e);
+ throw e;
}
} catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
+ throw e;
} finally {
try {
exec.stop();
@@ -76,7 +75,7 @@ public class ClusterUtil {
}
}
- public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
+ public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) throws Exception {
runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index efb1e94..9586253 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -34,9 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.ClusterUtil;
import org.apache.flink.util.Collector;
-import org.junit.Ignore;
import org.junit.Test;
public class StreamComponentTest {
@@ -83,7 +81,6 @@ public class StreamComponentTest {
private static final int SOURCE_PARALELISM = 1;
private static final long MEMORYSIZE = 32;
- @Ignore
@Test
public void wrongJobGraph() {
LocalStreamEnvironment env = StreamExecutionEnvironment
@@ -92,8 +89,7 @@ public class StreamComponentTest {
try {
env.execute();
fail();
- } catch (RuntimeException e) {
- assertEquals(e.getMessage(), ClusterUtil.CANNOT_EXECUTE_EMPTY_JOB);
+ } catch (Exception e) {
}
env.fromCollection(Arrays.asList("a", "b"));
@@ -101,8 +97,7 @@ public class StreamComponentTest {
try {
env.execute();
fail();
- } catch (RuntimeException e) {
- System.out.println(e.getMessage());
+ } catch (Exception e) {
}
try {
@@ -157,6 +152,7 @@ public class StreamComponentTest {
}
static HashSet<String> resultSet;
+
private static class SetSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@@ -165,33 +161,33 @@ public class StreamComponentTest {
resultSet.add(value);
}
}
-
+
@Test
- public void coTest() {
+ public void coTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-
+
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-
+
resultSet = new HashSet<String>();
env.execute();
-
- HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3"));
+
+ HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+ "2", "3"));
assertEquals(expectedSet, resultSet);
}
@Test
- public void runStream() {
+ public void runStream() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
env.executeTest(MEMORYSIZE);
-
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 7d8a49c..8f26be5 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -52,7 +52,7 @@ public class BasicTopology {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
index 69b933c..5338e9d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
@@ -107,7 +107,7 @@ public class CellInfoLocal {
}
// Example for connecting data streams
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
PARALLELISM).setBufferTimeout(100);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
index 472c8c3..05398f4 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
@@ -30,7 +30,7 @@ public class JoinLocal {
// This example will join two streams. One which emits people's grades and
// one which emits people's salaries.
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
PARALLELISM).setBufferTimeout(100);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index a433fd0..3218c47 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -117,7 +117,7 @@ public class IncrementalLearningSkeleton {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
PARALLELISM).setBufferTimeout(1000);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index b298f3f..cd8f71c 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -29,7 +29,7 @@ public class WindowJoinLocal {
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
PARALLELISM).setBufferTimeout(100);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/48d8ed70/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index f78cd1a..532ec53 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -43,7 +43,7 @@ public class WordCountLocal {
}
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);