You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 14:56:46 UTC
[1/3] flink git commit: test refactoring fix
Repository: flink
Updated Branches:
refs/heads/master ac888b239 -> 22e00655f
test refactoring fix
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6f183ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6f183ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6f183ac
Branch: refs/heads/master
Commit: b6f183ac5a03286febead090e9a1d4611c2e283d
Parents: ac888b2
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:44:42 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 12:43:23 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/api/scala/operators/GroupCombineITCase.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b6f183ac/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 0aa361c..ef484df 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.test.javaApiOperators.GroupCombineITCase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -28,7 +28,6 @@ import org.apache.flink.util.Collector
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.apache.flink.api.scala._
/**
* Java interoperability tests. Main tests are in GroupCombineITCase Java.
[3/3] flink git commit: [FLINK-1757] [streaming] Fixed type cast bug
in SumFunction
Posted by gy...@apache.org.
[FLINK-1757] [streaming] Fixed type cast bug in SumFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22e00655
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22e00655
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22e00655
Branch: refs/heads/master
Commit: 22e00655fa70a4986b87b1cde6f998b2baaf0507
Parents: a6da9f2
Author: szape <ne...@gmail.com>
Authored: Fri Mar 20 13:58:07 2015 +0100
Committer: szape <ne...@gmail.com>
Committed: Fri Mar 20 14:01:50 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/function/aggregation/SumFunction.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/22e00655/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
index 2aef19c..bc851e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
@@ -78,7 +78,7 @@ public abstract class SumFunction implements Serializable{
@Override
public Object add(Object value1, Object value2) {
- return (Short) value1 + (Short) value2;
+ return (short) ((Short) value1 + (Short) value2);
}
}
@@ -96,7 +96,7 @@ public abstract class SumFunction implements Serializable{
@Override
public Object add(Object value1, Object value2) {
- return (Byte) value1 + (Byte) value2;
+ return (byte) ((Byte) value1 + (Byte) value2);
}
}
}
[2/3] flink git commit: [streaming] [FLINK-1740] Pass config param
for numberOfExecutionRetries to the JobGraph for streaming jobs
Posted by gy...@apache.org.
[streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs
Closes #501
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6da9f26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6da9f26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6da9f26
Branch: refs/heads/master
Commit: a6da9f26d7128e3db697b183a53ade26afb40801
Parents: b6f183a
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:54:36 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 13:42:35 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/streaming/api/StreamGraph.java | 4 ++++
.../flink/streaming/api/StreamingJobGraphGenerator.java | 11 +++++++++--
2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 948ea5e..71706bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -555,6 +555,10 @@ public class StreamGraph extends StreamingPlan {
return operatorNames.get(vertexID);
}
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
public void setMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 0146448..544ccc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -80,8 +80,15 @@ public class StreamingJobGraphGenerator {
jobGraph.setJobType(JobGraph.JobType.STREAMING);
jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
- if (jobGraph.isMonitoringEnabled()) {
- jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+
+ if(jobGraph.isMonitoringEnabled()) {
+ int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+ if(executionRetries != -1) {
+ jobGraph.setNumberOfExecutionRetries(executionRetries);
+ }
+ else {
+ jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+ }
}
init();