You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/24 21:30:56 UTC

[1/5] flink git commit: [hotfix] [build] Fix diverging snappy versions.

Repository: flink
Updated Branches:
  refs/heads/release-1.4 c701a335b -> eb6380901


[hotfix] [build] Fix diverging snappy versions.

This removes the snappy dependency from flink-core, which is no longer needed since we do
not have an Avro dependency in flink-core any more.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da8446ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da8446ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da8446ee

Branch: refs/heads/release-1.4
Commit: da8446ee5d1f305ef633e95908d2ff6e14a31206
Parents: c701a33
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 21:01:36 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:06:13 2018 +0100

----------------------------------------------------------------------
 flink-core/pom.xml    | 6 ------
 flink-runtime/pom.xml | 1 -
 pom.xml               | 2 +-
 3 files changed, 1 insertion(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da8446ee/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index f82e1ed..674c8bd 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,12 +80,6 @@ under the License.
 			<!-- managed version -->
 		</dependency>
 
-		<!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
-		<dependency>
-			<groupId>org.xerial.snappy</groupId>
-			<artifactId>snappy-java</artifactId>
-		</dependency>
-
 		<!-- ================== test dependencies ================== -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/da8446ee/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 7be858f..abbe47f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -168,7 +168,6 @@ under the License.
 		<dependency>
 			<groupId>org.xerial.snappy</groupId>
 			<artifactId>snappy-java</artifactId>
-			<version>1.1.4</version>
 		</dependency>
 
 		<!--

http://git-wip-us.apache.org/repos/asf/flink/blob/da8446ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index abbd5db..f85f335 100644
--- a/pom.xml
+++ b/pom.xml
@@ -295,7 +295,7 @@ under the License.
 			<dependency>
 				<groupId>org.xerial.snappy</groupId>
 				<artifactId>snappy-java</artifactId>
-				<version>1.1.1.3</version>
+				<version>1.1.4</version>
 			</dependency>
 
 			<!-- Make sure we use a consistent avro version between Flink and Hadoop -->		


[4/5] flink git commit: [FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.

Posted by se...@apache.org.
[FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.

That way, holding on to the ErrorInfo does not prevent class unloading.

However, this implies that the ErrorInfo must not hold strong references to any Exception classes.
For that reason, the commit pull the "ground truth" exception into a separate fields, so that the
ExecutionGraph logic itself can always assume to have the proper ground-truth exception.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d1ba45e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d1ba45e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d1ba45e

Branch: refs/heads/release-1.4
Commit: 4d1ba45e78fa4ed6cd5af0405c7988ffaa6dee13
Parents: a8ea169
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 22:00:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:57:02 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/util/SerializedThrowable.java  |  4 ++
 .../itcases/AbstractQueryableStateTestBase.java |  2 +-
 .../executiongraph/AccessExecutionGraph.java    |  2 +-
 .../executiongraph/ArchivedExecutionGraph.java  |  2 +-
 .../flink/runtime/executiongraph/ErrorInfo.java | 34 +++--------
 .../runtime/executiongraph/ExecutionGraph.java  | 64 ++++++++------------
 .../rest/handler/job/JobExceptionsHandler.java  |  2 +-
 .../handler/legacy/JobExceptionsHandler.java    |  2 +-
 .../ArchivedExecutionGraphTest.java             |  2 +-
 .../runtime/executiongraph/ErrorInfoTest.java   | 62 +++++++++++++++++++
 .../ExecutionGraphRestartTest.java              |  2 +-
 .../ExecutionGraphSuspendTest.java              |  2 +-
 .../legacy/JobExceptionsHandlerTest.java        |  4 +-
 13 files changed, 111 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index dab7cda..6a721b3 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -136,6 +136,10 @@ public class SerializedThrowable extends Exception implements Serializable {
 		return originalErrorClassName;
 	}
 
+	public String getFullStringifiedStackTrace() {
+		return fullStringifiedStackTrace;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Override the behavior of Throwable
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 5a28367..9ca3bda 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -340,7 +340,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
 				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-		String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+		String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString();
 
 		assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
 		assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index ebc0768..ce490dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -70,7 +70,7 @@ public interface AccessExecutionGraph {
 	 * @return failure causing exception, or null
 	 */
 	@Nullable
-	ErrorInfo getFailureCause();
+	ErrorInfo getFailureInfo();
 
 	/**
 	 * Returns the job vertex for the given {@link JobVertexID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 4481e1b..20c2c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -143,7 +143,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public ErrorInfo getFailureCause() {
+	public ErrorInfo getFailureInfo() {
 		return failureCause;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index d919bfa..9fe569f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
@@ -32,26 +30,25 @@ public class ErrorInfo implements Serializable {
 
 	private static final long serialVersionUID = -6138942031953594202L;
 
-	private final transient Throwable exception;
-	private final long timestamp;
+	/** The exception that we keep holding forever. Has no strong reference to any user-defined code. */
+	private final SerializedThrowable exception;
 
-	private volatile String exceptionAsString;
+	private final long timestamp;
 
 	public ErrorInfo(Throwable exception, long timestamp) {
 		Preconditions.checkNotNull(exception);
 		Preconditions.checkArgument(timestamp > 0);
 
-		this.exception = exception;
+		this.exception = exception instanceof SerializedThrowable ?
+				(SerializedThrowable) exception : new SerializedThrowable(exception);
 		this.timestamp = timestamp;
 	}
 
 	/**
-	 * Returns the contained exception.
-	 *
-	 * @return contained exception, or {@code "(null)"} if either no exception was set or this object has been deserialized
+	 * Returns the serialized form of the original exception.
 	 */
-	Throwable getException() {
-		return exception;
+	public SerializedThrowable getException() {
+		return this.exception;
 	}
 
 	/**
@@ -60,10 +57,7 @@ public class ErrorInfo implements Serializable {
 	 * @return failure causing exception as a string, or {@code "(null)"}
 	 */
 	public String getExceptionAsString() {
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		return exceptionAsString;
+		return exception.getFullStringifiedStackTrace();
 	}
 
 	/**
@@ -74,12 +68,4 @@ public class ErrorInfo implements Serializable {
 	public long getTimestamp() {
 		return timestamp;
 	}
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		// make sure that the exception was stringified so it isn't lost during serialization
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		out.defaultWriteObject();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8a74001..7fa55be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -264,7 +264,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
-	private volatile ErrorInfo failureCause;
+	private volatile Throwable failureCause;
+
+	/** The extended failure cause information for the job. This exists in addition to 'failureCause',
+	 * to let 'failureCause' be a strong reference to the exception, while this info holds no
+	 * strong reference to any user-defined classes.*/
+	private volatile ErrorInfo failureInfo;
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
@@ -620,10 +625,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return state;
 	}
 
-	public ErrorInfo getFailureCause() {
+	public Throwable getFailureCause() {
 		return failureCause;
 	}
 
+	public ErrorInfo getFailureInfo() {
+		return failureInfo;
+	}
+
 	/**
 	 * Gets the number of full restarts that the execution graph went through.
 	 * If a full restart recovery is currently pending, this recovery is included in the
@@ -1026,25 +1035,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @param suspensionCause Cause of the suspension
 	 */
 	public void suspend(Throwable suspensionCause) {
-		suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
-	}
-
-	/**
-	 * Suspends the current ExecutionGraph.
-	 *
-	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
-	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
-	 *
-	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
-	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
-	 *
-	 * @param errorInfo ErrorInfo containing the cause of the suspension
-	 */
-	public void suspend(ErrorInfo errorInfo) {
-		Throwable suspensionCause = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
 		while (true) {
 			JobStatus currentState = state;
 
@@ -1052,7 +1042,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
-				this.failureCause = errorInfo;
+				initFailureCause(suspensionCause);
 
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
@@ -1072,10 +1062,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void failGlobal(Throwable error) {
-		failGlobal(new ErrorInfo(error, System.currentTimeMillis()));
-	}
-
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
@@ -1085,13 +1071,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
 	 * safe way to get consistency back.
 	 *
-	 * @param errorInfo ErrorInfo containing the exception that caused the failure.
+	 * @param t The exception that caused the failure.
 	 */
-	public void failGlobal(ErrorInfo errorInfo) {
-		Throwable t = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
+	public void failGlobal(Throwable t) {
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1103,7 +1085,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (current == JobStatus.RESTARTING) {
 				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
 				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				final long globalVersionForRestart = incrementGlobalModVersion();
 				if (tryRestartOrFail(globalVersionForRestart)) {
@@ -1111,7 +1093,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				// make sure no concurrent local or global actions interfere with the failover
 				final long globalVersionForRestart = incrementGlobalModVersion();
@@ -1304,6 +1286,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
 	}
 
+	private void initFailureCause(Throwable t) {
+		this.failureCause = t;
+		this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Job Status Progress
 	// ------------------------------------------------------------------------
@@ -1399,9 +1386,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
-			Throwable failureCause = this.failureCause != null
-				? this.failureCause.getException()
-				: null;
+			final Throwable failureCause = this.failureCause;
+
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
@@ -1678,7 +1664,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				catch (Throwable t) {
 					// bug in the failover strategy - fall back to global failover
 					LOG.warn("Error in failover strategy - falling back to global restart", t);
-					failGlobal(new ErrorInfo(ex, timestamp));
+					failGlobal(ex);
 				}
 			}
 		}
@@ -1710,7 +1696,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			archivedVerticesInCreationOrder,
 			stateTimestamps,
 			getState(),
-			failureCause,
+			failureInfo,
 			getJsonPlan(),
 			getAccumulatorResultsStringified(),
 			serializedUserAccumulators,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index feabbea..ea3fd51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -65,7 +65,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 
 	@Override
 	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
-		ErrorInfo rootException = executionGraph.getFailureCause();
+		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 6a4cc0d..a6bae86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -92,7 +92,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStartObject();
 
 		// most important is the root failure cause
-		ErrorInfo rootException = graph.getFailureCause();
+		ErrorInfo rootException = graph.getFailureInfo();
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
 			gen.writeStringField("root-exception", rootException.getExceptionAsString());
 			gen.writeNumberField("timestamp", rootException.getTimestamp());

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d7c8e6..a96a03e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -171,7 +171,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
 		assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
 		assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-		assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
+		assertEquals(runtimeGraph.getFailureInfo().getExceptionAsString(), archivedGraph.getFailureInfo().getExceptionAsString());
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
new file mode 100644
index 0000000..4841365
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test for the {@link ErrorInfo}.
+ */
+public class ErrorInfoTest {
+
+	@Test
+	public void testSerializationWithExceptionOutsideClassLoader() throws Exception {
+		final ErrorInfo error = new ErrorInfo(new ExceptionWithCustomClassLoader(), System.currentTimeMillis());
+		final ErrorInfo copy = CommonTestUtils.createCopySerializable(error);
+
+		assertEquals(error.getTimestamp(), copy.getTimestamp());
+		assertEquals(error.getExceptionAsString(), copy.getExceptionAsString());
+		assertEquals(error.getException().getMessage(), copy.getException().getMessage());
+
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ExceptionWithCustomClassLoader extends Exception {
+
+		private static final long serialVersionUID = 42L;
+
+		private static final ClassLoader CUSTOM_LOADER = new URLClassLoader(new URL[0]);
+
+		@SuppressWarnings("unused")
+		private final Serializable outOfClassLoader = CommonTestUtils.createObjectForClassNotInClassPath(CUSTOM_LOADER);
+
+		public ExceptionWithCustomClassLoader() {
+			super("tada");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 8770b06..e788881 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -818,7 +818,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
 
-		final Throwable t = eg.getFailureCause().getException();
+		final Throwable t = eg.getFailureCause();
 		if (!(t instanceof NoResourceAvailableException)) {
 			ExceptionUtils.rethrowException(t, t.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index f0adc32..852a7a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -252,7 +252,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-		assertEquals(exception, eg.getFailureCause().getException());
+		assertEquals(exception, eg.getFailureCause());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
index 641bf96..0e96f36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -76,8 +76,8 @@ public class JobExceptionsHandlerTest {
 	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
-		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
-		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
+		Assert.assertEquals(originalJob.getFailureInfo().getExceptionAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureInfo().getTimestamp(), result.get("timestamp").asLong());
 
 		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
 


[3/5] flink git commit: [FLINK-8499] [core] Force Kryo to be parent-first loaded.

Posted by se...@apache.org.
[FLINK-8499] [core] Force Kryo to be parent-first loaded.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15cb057b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15cb057b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15cb057b

Branch: refs/heads/release-1.4
Commit: 15cb057bffd32ba8a853b46b207a5b7ea6bba430
Parents: da8446e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 19:58:10 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:06:14 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/CoreOptions.java |  2 +-
 .../formats/avro/AvroKryoClassloadingTest.java  | 89 ++++++++++++++++++++
 .../core/testutils/FilteredClassLoader.java     | 60 +++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index c48e5ef..27f39a4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -36,7 +36,7 @@ public class CoreOptions {
 
 	public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
 		.key("classloader.parent-first-patterns")
-		.defaultValue("java.;scala.;org.apache.flink.;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+		.defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
 
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
new file mode 100644
index 0000000..6eaca15
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.testutils.FilteredClassLoader;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.LinkedHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test makes sure that reversed classloading works for the Avro/Kryo integration when
+ * Kryo is in the application jar file.
+ *
+ * <p>If Kryo is not loaded consistently through the same classloader (parent-first), the following
+ * error happens:
+ *
+ * <pre>
+ * java.lang.VerifyError: Bad type on operand stack
+ * Exception Details:
+ *   Location:
+ *  org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
+ *   Reason:
+ *     Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
+ *   Current Frame:
+ *     bci: @23
+ *     flags: { }
+ *     locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
+ *     stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
+ *   Bytecode:
+ *     0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
+ *     0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
+ *     0x0000020: 57b1
+ * </pre>
+ */
+public class AvroKryoClassloadingTest {
+
+	@Test
+	public void testKryoInChildClasspath() throws Exception {
+		final Class<?> avroClass = AvroKryoSerializerUtils.class;
+
+		final URL avroLocation = avroClass.getProtectionDomain().getCodeSource().getLocation();
+		final URL kryoLocation = Kryo.class.getProtectionDomain().getCodeSource().getLocation();
+
+		final ClassLoader parentClassLoader = new FilteredClassLoader(
+				avroClass.getClassLoader(), AvroKryoSerializerUtils.class.getName());
+
+		final ClassLoader userAppClassLoader = FlinkUserCodeClassLoaders.childFirst(
+				new URL[] { avroLocation, kryoLocation },
+				parentClassLoader,
+				CoreOptions.ALWAYS_PARENT_FIRST_LOADER.defaultValue().split(";"));
+
+		final Class<?> userLoadedAvroClass = Class.forName(avroClass.getName(), false, userAppClassLoader);
+		assertNotEquals(avroClass, userLoadedAvroClass);
+
+		// call the 'addAvroGenericDataArrayRegistration(...)' method
+		final Method m = userLoadedAvroClass.getMethod("addAvroGenericDataArrayRegistration", LinkedHashMap.class);
+
+		final LinkedHashMap<String, ?> map = new LinkedHashMap<>();
+		m.invoke(userLoadedAvroClass.newInstance(), map);
+
+		assertEquals(1, map.size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
new file mode 100644
index 0000000..f04393b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+
+/**
+ * A ClassLoader that filters out certain classes (by name) and throws a ClassNotFoundException
+ * when they should be loaded.
+ *
+ * <p>This utility is useful when trying to eliminate certain classes from a class loader
+ * force loading them through another class loader.
+ */
+public class FilteredClassLoader extends ClassLoader {
+
+	/** The set of class names for the filtered classes. */
+	private final HashSet<String> filteredClassNames;
+
+	/**
+	 * Creates a new filtered classloader.
+	 *
+	 * @param delegate The class loader that is filtered by this classloader.
+	 * @param filteredClassNames The class names to filter out.
+	 */
+	public FilteredClassLoader(ClassLoader delegate, String... filteredClassNames) {
+		super(Objects.requireNonNull(delegate));
+
+		this.filteredClassNames = new HashSet<>(Arrays.asList(filteredClassNames));
+	}
+
+	@Override
+	protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+		synchronized (this) {
+			if (filteredClassNames.contains(name)) {
+				throw new ClassNotFoundException(name);
+			}
+			else {
+				return super.loadClass(name, resolve);
+			}
+		}
+	}
+}


[5/5] flink git commit: [FLINK-8406] [bucketing sink] Fix proper access of Hadoop File Systems

Posted by se...@apache.org.
[FLINK-8406] [bucketing sink] Fix proper access of Hadoop File Systems


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb638090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb638090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb638090

Branch: refs/heads/release-1.4
Commit: eb6380901e0c62c57eb300b730548c8ad9ec15e0
Parents: 4d1ba45
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 22 13:32:09 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:57:02 2018 +0100

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  |  3 +-
 .../fs/bucketing/BucketingSinkFsInitTest.java   | 83 ++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eb638090/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index db0a5d8..6293fe0 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -1122,7 +1122,8 @@ public class BucketingSink<T>
 		// try to get the Hadoop File System via the Flink File Systems
 		// that way we get the proper configuration
 
-		final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
+		final org.apache.flink.core.fs.FileSystem flinkFs =
+				org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(path.toUri());
 		final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
 				((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eb638090/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFsInitTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFsInitTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFsInitTest.java
new file mode 100644
index 0000000..f684ef0
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFsInitTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the file system initialization of the Bucketing sink.
+ *
+ * <p>NOTE: These tests can probably go away once the bucketing sink has been
+ * migrated to properly use Flink's file system abstraction.
+ */
+public class BucketingSinkFsInitTest {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+
+	// to properly mimic what happens in the runtime task, we need to make sure that
+	// the file system safety net is in place
+
+	@Before
+	public void activateSafetyNet() {
+		FileSystemSafetyNet.initializeSafetyNetForThread();
+	}
+
+	@After
+	public void deactivateSafetyNet() {
+		FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testInitForLocalFileSystem() throws Exception {
+		final Path path = new Path(tempFolder.newFolder().toURI());
+		FileSystem fs = BucketingSink.createHadoopFileSystem(path, null);
+
+		assertEquals("file", fs.getUri().getScheme());
+	}
+
+	@Test
+	public void testInitForHadoopFileSystem() throws Exception {
+		final Path path = new Path("hdfs://localhost:51234/some/path/");
+		FileSystem fs = BucketingSink.createHadoopFileSystem(path, null);
+
+		assertEquals("hdfs", fs.getUri().getScheme());
+	}
+
+	@Test(expected = UnsupportedFileSystemSchemeException.class)
+	public void testInitForUnsupportedFileSystem() throws Exception {
+		final Path path = new Path("nofs://localhost:51234/some/path/");
+		BucketingSink.createHadoopFileSystem(path, null);
+	}
+}


[2/5] flink git commit: [hotfix] [build] Converge Kryo dependency

Posted by se...@apache.org.
[hotfix] [build] Converge Kryo dependency

Previously, the Kryo dependency was diverging between the flink-core dependency
and the chill dependency.

[INFO] +- org.apache.flink:flink-java:jar:1.4.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.4.0:compile
[INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
....
[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.4.0:compile
[INFO] |  +- (org.apache.flink:flink-core:jar:1.4.0:compile - omitted for
duplicate)
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.4.0:compile
[INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.4:compile
[INFO] |  |  |  +- com.twitter:chill-java:jar:0.7.4:compile
[INFO] |  |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)
[INFO] |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8ea1698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8ea1698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8ea1698

Branch: refs/heads/release-1.4
Commit: a8ea1698f44614ec9f35d9307b3294e4c165fcf0
Parents: 15cb057
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 24 10:02:00 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 18:06:14 2018 +0100

----------------------------------------------------------------------
 flink-runtime/pom.xml | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8ea1698/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index abbe47f..a8c583b 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -180,6 +180,13 @@ under the License.
 			<groupId>com.twitter</groupId>
 			<artifactId>chill_${scala.binary.version}</artifactId>
 			<version>${chill.version}</version>
+			<exclusions>
+				<!-- Remove Kryo dependency to force using Flink's own Kryo dependency -->
+				<exclusion>
+					<groupId>com.esotericsoftware.kryo</groupId>
+					<artifactId>kryo</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- Curator and ZooKeeper - we explicitly add ZooKeeper here as