You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 17:47:16 UTC

[1/2] flink git commit: [FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.

Repository: flink
Updated Branches:
  refs/heads/master f1b5b35f5 -> 90902914a


[FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.

The deadlock could occur in cases where the SpilledSubpartitionViewAsyncIO would simultaneously try to
release a buffer and encounter an error in another thread.

The field of congestion was the listener, which is now replaced by an AtomicReference, removing the
necessity to lock in the case of reporting the error.

This closes #2444


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90902914
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90902914
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90902914

Branch: refs/heads/master
Commit: 90902914ac4b11f9554b67ad49e0d697a0d02f93
Parents: b928935
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 16:22:34 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 19:46:40 2016 +0200

----------------------------------------------------------------------
 .../SpilledSubpartitionViewAsyncIO.java         | 26 ++++++++------------
 .../checkpoint/CheckpointIDCounterTest.java     |  4 +--
 2 files changed, 11 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90902914/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index daccd28..ca25536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,7 +71,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 	private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>();
 
 	/** A data availability listener. */
-	private NotificationListener registeredListener;
+	private final AtomicReference<NotificationListener> registeredListener;
 
 	/** Error, which has occurred in the I/O thread. */
 	private volatile IOException errorInIOThread;
@@ -108,7 +109,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 		this.parent = checkNotNull(parent);
 		this.bufferProvider = checkNotNull(bufferProvider);
 		this.bufferAvailabilityListener = new BufferProviderCallback(this);
-
+		this.registeredListener = new AtomicReference<>();
+		
 		this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
 
 		if (initialSeekPosition > 0) {
@@ -154,14 +156,12 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 				return false;
 			}
 
-			if (registeredListener == null) {
-				registeredListener = listener;
-
+			if (registeredListener.compareAndSet(null, listener)) {
 				return true;
+			} else {
+				throw new IllegalStateException("already registered listener");
 			}
 		}
-
-		throw new IllegalStateException("Already registered listener.");
 	}
 
 	@Override
@@ -279,8 +279,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 
 			returnedBuffers.add(buffer);
 
-			listener = registeredListener;
-			registeredListener = null;
+			// after this, the listener should be null
+			listener = registeredListener.getAndSet(null);
 
 			// If this was the last buffer before we reached EOF, set the corresponding flag to
 			// ensure that further buffers are correctly recycled and eventually no further reads
@@ -303,13 +303,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 			errorInIOThread = error;
 		}
 
-		final NotificationListener listener;
-
-		synchronized (lock) {
-			listener = registeredListener;
-			registeredListener = null;
-		}
-
+		final NotificationListener listener = registeredListener.getAndSet(null);
 		if (listener != null) {
 			listener.onNotification();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/90902914/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
index dc43b47..49b5fe7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -57,9 +57,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
 
 		@AfterClass
 		public static void tearDown() throws Exception {
-			if (ZooKeeper != null) {
-				ZooKeeper.shutdown();
-			}
+			ZooKeeper.shutdown();
 		}
 
 		@Before


[2/2] flink git commit: [FLINK-4560] [build] Enforcer Java version >= 1.7 via Maven enforcer plugin

Posted by se...@apache.org.
[FLINK-4560] [build] Enforcer Java version >= 1.7 via Maven enforcer plugin

This closes #2458


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b928935b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b928935b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b928935b

Branch: refs/heads/master
Commit: b928935b8c5be02b23dd2cb87144ae1ea001278c
Parents: f1b5b35
Author: shijinkui <sh...@huawei.com>
Authored: Fri Sep 2 10:46:45 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 19:46:40 2016 +0200

----------------------------------------------------------------------
 pom.xml | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b928935b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e517e9..b2229fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,7 @@ under the License.
 		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>18.0</guava.version>
 		<akka.version>2.3.7</akka.version>
+		<java.version>1.7</java.version>
 		<scala.macros.version>2.0.1</scala.macros.version>
 		<!-- Default scala versions, may be overwritten by build profiles -->
 		<scala.version>2.10.4</scala.version>
@@ -929,8 +930,8 @@ under the License.
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
 				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
 					<!-- The output of Xlint is not shown by default, but we activate it for the QA bot
 					to be able to get more warnings -->
 					<compilerArgument>-Xlint:all</compilerArgument>
@@ -999,7 +1000,7 @@ under the License.
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-enforcer-plugin</artifactId>
-				<version>1.3.1</version><!--$NO-MVN-MAN-VER$-->
+				<version>1.4.1</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
 					<execution>
 						<id>enforce-maven</id>
@@ -1012,6 +1013,9 @@ under the License.
 									<!-- enforce at least mvn version 3.0.3 -->
 									<version>[3.0.3,)</version>
 								</requireMavenVersion>
+								<requireJavaVersion>
+									<version>${java.version}</version>
+								</requireJavaVersion>
 							</rules>
 						</configuration>
 					</execution>