You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/23 10:14:54 UTC

[1/4] flink git commit: [FLINK-9888][release] Remove unsafe defaults from release scripts

Repository: flink
Updated Branches:
  refs/heads/release-1.5 8193d5dc6 -> 3041bd3b8


[FLINK-9888][release] Remove unsafe defaults from release scripts

This closes #6362.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9baca1b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9baca1b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9baca1b8

Branch: refs/heads/release-1.5
Commit: 9baca1b8095cde8d05771f0800df587987a52ef6
Parents: 8193d5d
Author: zentol <ch...@apache.org>
Authored: Wed Jul 18 13:42:06 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:13:08 2018 +0200

----------------------------------------------------------------------
 tools/releasing/create_binary_release.sh |  6 +++++-
 tools/releasing/create_release_branch.sh | 12 ++++++++++--
 tools/releasing/create_source_release.sh |  6 +++++-
 tools/releasing/update_branch_version.sh | 12 ++++++++++--
 4 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9baca1b8/tools/releasing/create_binary_release.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh
index 0bdff60..4dbc6fa 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -20,12 +20,16 @@
 ##
 ## Variables with defaults (if not overwritten by environment)
 ##
-RELEASE_VERSION=${RELEASE_VERSION:-1.3-SNAPSHOT}
 SCALA_VERSION=none
 HADOOP_VERSION=none
 SKIP_GPG=${SKIP_GPG:-false}
 MVN=${MVN:-mvn}
 
+if [ -z "${RELEASE_VERSION}" ]; then
+    echo "RELEASE_VERSION was not set."
+    exit 1
+fi
+
 # fail immediately
 set -o errexit
 set -o nounset

http://git-wip-us.apache.org/repos/asf/flink/blob/9baca1b8/tools/releasing/create_release_branch.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/create_release_branch.sh b/tools/releasing/create_release_branch.sh
index 9853252..7e16483 100755
--- a/tools/releasing/create_release_branch.sh
+++ b/tools/releasing/create_release_branch.sh
@@ -20,11 +20,19 @@
 ##
 ## Variables with defaults (if not overwritten by environment)
 ##
-OLD_VERSION=${OLD_VERSION:-1.2-SNAPSHOT}
-NEW_VERSION=${NEW_VERSION:-1.3-SNAPSHOT}
 RELEASE_CANDIDATE=${RELEASE_CANDIDATE:-none}
 MVN=${MVN:-mvn}
 
+if [ -z "${OLD_VERSION}" ]; then
+    echo "OLD_VERSION was not set."
+    exit 1
+fi
+
+if [ -z "${NEW_VERSION}" ]; then
+    echo "NEW_VERSION was not set."
+    exit 1
+fi
+
 # fail immediately
 set -o errexit
 set -o nounset

http://git-wip-us.apache.org/repos/asf/flink/blob/9baca1b8/tools/releasing/create_source_release.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/create_source_release.sh b/tools/releasing/create_source_release.sh
index 0cc32fe..9329802 100755
--- a/tools/releasing/create_source_release.sh
+++ b/tools/releasing/create_source_release.sh
@@ -20,9 +20,13 @@
 ##
 ## Variables with defaults (if not overwritten by environment)
 ##
-RELEASE_VERSION=${RELEASE_VERSION:-1.3-SNAPSHOT}
 MVN=${MVN:-mvn}
 
+if [ -z "${RELEASE_VERSION}" ]; then
+    echo "RELEASE_VERSION was not set."
+    exit 1
+fi
+
 # fail immediately
 set -o errexit
 set -o nounset

http://git-wip-us.apache.org/repos/asf/flink/blob/9baca1b8/tools/releasing/update_branch_version.sh
----------------------------------------------------------------------
diff --git a/tools/releasing/update_branch_version.sh b/tools/releasing/update_branch_version.sh
index e4dc710..951ea71 100755
--- a/tools/releasing/update_branch_version.sh
+++ b/tools/releasing/update_branch_version.sh
@@ -20,10 +20,18 @@
 ##
 ## Variables with defaults (if not overwritten by environment)
 ##
-OLD_VERSION=${OLD_VERSION:-1.4-SNAPSHOT}
-NEW_VERSION=${NEW_VERSION:-1.5-SNAPSHOT}
 MVN=${MVN:-mvn}
 
+if [ -z "${OLD_VERSION}" ]; then
+    echo "OLD_VERSION was not set."
+    exit 1
+fi
+
+if [ -z "${NEW_VERSION}" ]; then
+    echo "NEW_VERSION was not set."
+    exit 1
+fi
+
 # fail immediately
 set -o errexit
 set -o nounset


[4/4] flink git commit: [FLINK-9805][rest] Catch JsonProcessingException in RestClient

Posted by ch...@apache.org.
[FLINK-9805][rest] Catch JsonProcessingException in RestClient

This closes #6307.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3041bd3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3041bd3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3041bd3b

Branch: refs/heads/release-1.5
Commit: 3041bd3b8658a004803617eb0603bf8e459b5173
Parents: 1ec8930
Author: zentol <ch...@apache.org>
Authored: Wed Jul 11 15:41:24 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:20:18 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/runtime/rest/RestClient.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3041bd3b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index b75ffbf..3c48135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
@@ -409,7 +408,7 @@ public class RestClient {
 			try (InputStream in = new ByteBufInputStream(content)) {
 				rawResponse = objectMapper.readTree(in);
 				LOG.debug("Received response {}.", rawResponse);
-			} catch (JsonParseException je) {
+			} catch (JsonProcessingException je) {
 				LOG.error("Response was not valid JSON.", je);
 				// let's see if it was a plain-text message instead
 				content.readerIndex(0);


[2/4] flink git commit: [FLINK-9873][runtime] Log task state when aborting checkpoint

Posted by ch...@apache.org.
[FLINK-9873][runtime] Log task state when aborting checkpoint

This closes #6350.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa770ba6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa770ba6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa770ba6

Branch: refs/heads/release-1.5
Commit: aa770ba6f325b85c7242e535d45a6080d2703232
Parents: 9baca1b
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 09:34:45 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:15:21 2018 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java      | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa770ba6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..82227cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -456,13 +456,20 @@ public class CheckpointCoordinator {
 		Execution[] executions = new Execution[tasksToTrigger.length];
 		for (int i = 0; i < tasksToTrigger.length; i++) {
 			Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
-			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
-				executions[i] = ee;
-			} else {
+			if (ee == null) {
 				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
 						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
 						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+			} else if (ee.getState() == ExecutionState.RUNNING) {
+				executions[i] = ee;
+			} else {
+				LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+						job,
+						ExecutionState.RUNNING,
+						ee.getState());
+				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
 


[3/4] flink git commit: [FLINK-9841][rest] Close log file channel after response was fully written

Posted by ch...@apache.org.
[FLINK-9841][rest] Close log file channel after response was fully written

This closes #6329.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ec89302
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ec89302
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ec89302

Branch: refs/heads/release-1.5
Commit: 1ec893027b0b24545509ff9715039fc3580130a0
Parents: aa770ba
Author: yanghua <ya...@gmail.com>
Authored: Fri Jul 13 16:48:04 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:19:24 2018 +0200

----------------------------------------------------------------------
 .../AbstractTaskManagerFileHandler.java         | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ec89302/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 265813f..303f7d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -63,6 +63,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureList
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
@@ -208,11 +209,20 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 	}
 
 	private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest httpRequest) throws FlinkException {
-		try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
-			final long fileLength = randomAccessFile.length();
+		final RandomAccessFile randomAccessFile;
 
-			try (final FileChannel fileChannel = randomAccessFile.getChannel()) {
+		try {
+			randomAccessFile = new RandomAccessFile(file, "r");
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Can not find file " + file + ".", e);
+		}
+
+		try {
 
+			final long fileLength = randomAccessFile.length();
+			final FileChannel fileChannel = randomAccessFile.getChannel();
+
+			try {
 				HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
 				response.headers().set(CONTENT_TYPE, "text/plain");
 
@@ -251,8 +261,17 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 				if (!HttpHeaders.isKeepAlive(httpRequest)) {
 					lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 				}
+			} catch (IOException ex) {
+				fileChannel.close();
+				throw ex;
 			}
 		} catch (IOException ioe) {
+			try {
+				randomAccessFile.close();
+			} catch (IOException e) {
+				throw new FlinkException("Close file or channel error.", e);
+			}
+
 			throw new FlinkException("Could not transfer file " + file + " to the client.", ioe);
 		}
 	}