You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 14:56:46 UTC

[1/3] flink git commit: test refactoring fix

Repository: flink
Updated Branches:
  refs/heads/master ac888b239 -> 22e00655f


test refactoring fix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6f183ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6f183ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6f183ac

Branch: refs/heads/master
Commit: b6f183ac5a03286febead090e9a1d4611c2e283d
Parents: ac888b2
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:44:42 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 12:43:23 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/operators/GroupCombineITCase.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6f183ac/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 0aa361c..ef484df 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.test.javaApiOperators.GroupCombineITCase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -28,7 +28,6 @@ import org.apache.flink.util.Collector
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.apache.flink.api.scala._
 
 /**
  * Java interoperability tests. Main tests are in GroupCombineITCase Java.


[3/3] flink git commit: [FLINK-1757] [streaming] Fixed type cast bug in SumFunction

Posted by gy...@apache.org.
[FLINK-1757] [streaming] Fixed type cast bug in SumFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22e00655
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22e00655
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22e00655

Branch: refs/heads/master
Commit: 22e00655fa70a4986b87b1cde6f998b2baaf0507
Parents: a6da9f2
Author: szape <ne...@gmail.com>
Authored: Fri Mar 20 13:58:07 2015 +0100
Committer: szape <ne...@gmail.com>
Committed: Fri Mar 20 14:01:50 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/function/aggregation/SumFunction.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22e00655/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
index 2aef19c..bc851e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java
@@ -78,7 +78,7 @@ public abstract class SumFunction implements Serializable{
 
 		@Override
 		public Object add(Object value1, Object value2) {
-			return (Short) value1 + (Short) value2;
+			return (short) ((Short) value1 + (Short) value2);
 		}
 	}
 
@@ -96,7 +96,7 @@ public abstract class SumFunction implements Serializable{
 
 		@Override
 		public Object add(Object value1, Object value2) {
-			return (Byte) value1 + (Byte) value2;
+			return (byte) ((Byte) value1 + (Byte) value2);
 		}
 	}
 }


[2/3] flink git commit: [streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs

Posted by gy...@apache.org.
[streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs

Closes #501


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6da9f26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6da9f26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6da9f26

Branch: refs/heads/master
Commit: a6da9f26d7128e3db697b183a53ade26afb40801
Parents: b6f183a
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:54:36 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 13:42:35 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/streaming/api/StreamGraph.java |  4 ++++
 .../flink/streaming/api/StreamingJobGraphGenerator.java  | 11 +++++++++--
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 948ea5e..71706bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -555,6 +555,10 @@ public class StreamGraph extends StreamingPlan {
 		return operatorNames.get(vertexID);
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	public void setMonitoringEnabled(boolean monitoringEnabled) {
 		this.monitoringEnabled = monitoringEnabled;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 0146448..544ccc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -80,8 +80,15 @@ public class StreamingJobGraphGenerator {
 		jobGraph.setJobType(JobGraph.JobType.STREAMING);
 		jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
 		jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
-		if (jobGraph.isMonitoringEnabled()) {
-			jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+
+		if(jobGraph.isMonitoringEnabled()) {
+			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+			if(executionRetries != -1) {
+				jobGraph.setNumberOfExecutionRetries(executionRetries);
+			}
+			else {
+				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);	
+			}
 		}
 		init();