You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2017/09/04 15:06:35 UTC

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/4639

    [FLINK-7524] Remove potentially blocking behaviour from AbstractClose…

    …ableRegistry
    
    ## What is the purpose of the change
    
    This PR removes potential for blocking behaviour in all CloseableRegistries.
    
    ## Verifying this change
    
    Change is tested in `AbstractCloseableRegistryTest::testNonBlockingClose`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature?  (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink improve-closeable-registry

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4639
    
----
commit a11bc242031134504d80c66e3349772e315fa2cf
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-09-04T10:30:41Z

    [FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry

----


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140455425
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    And for the second part, I think it should be closed automatically, because you the caller decides to give the responsibility to the registry and tie it to the registry's status. So the registry should take care that the close status is propagated to new incoming objects.


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r136877817
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    In general, the idea of the change is to move every call that could block or aquire other locks, with a potential to deadlock outside of the synchronized blocks, so that this class will never block for other threads.


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140454362
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java ---
    @@ -27,10 +27,16 @@ private WrappingProxyUtil() {
     		throw new AssertionError();
     	}
     
    +	@SuppressWarnings("unchecked")
     	public static <T> T stripProxy(T object) {
    -		while (object instanceof WrappingProxy) {
    +
    +		T previous = null;
    +
    +		while (object instanceof WrappingProxy && previous != object) {
    --- End diff --
    
    Exactly, somebody could come up with the stupid idea to return ``this`` as inner object. Just to be on the safe side.


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140516796
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    Makes sense! 👌 



---

[GitHub] flink issue #4639: [FLINK-7524] Remove potentially blocking behaviour from A...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4639
  
    CC @aljoscha 


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140452104
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.util.AbstractCloseableRegistry;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.verify;
    +import static org.powermock.api.mockito.PowerMockito.spy;
    +
    +public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
    +
    +	protected ProducerThread[] streamOpenThreads;
    +	protected AbstractCloseableRegistry<C, T> closeableRegistry;
    +	protected AtomicInteger unclosedCounter;
    +
    +	protected abstract C createCloseable();
    +
    +	protected abstract AbstractCloseableRegistry<C, T> createRegistry();
    +
    +	protected abstract ProducerThread<C, T> createProducerThread(
    +		AbstractCloseableRegistry<C, T> registry,
    +		AtomicInteger unclosedCounter,
    +		int maxStreams);
    +
    +	public void setup() {
    +		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
    +		this.closeableRegistry = createRegistry();
    +		this.unclosedCounter = new AtomicInteger(0);
    +		this.streamOpenThreads = new ProducerThread[10];
    +		for (int i = 0; i < streamOpenThreads.length; ++i) {
    +			streamOpenThreads[i] = createProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
    +		}
    +	}
    +
    +	protected void startThreads(int maxStreams) {
    +		for (ProducerThread t : streamOpenThreads) {
    +			t.setMaxStreams(maxStreams);
    +			t.start();
    +		}
    +	}
    +
    +	protected void joinThreads() throws InterruptedException {
    +		for (Thread t : streamOpenThreads) {
    +			t.join();
    +		}
    +	}
    +
    +	@Test
    +	public void testClose() throws Exception {
    +
    +		setup();
    +		startThreads(Integer.MAX_VALUE);
    +
    +		for (int i = 0; i < 5; ++i) {
    +			System.gc();
    +			Thread.sleep(40);
    +		}
    +
    +		closeableRegistry.close();
    +
    +		joinThreads();
    +
    +		Assert.assertEquals(0, unclosedCounter.get());
    +		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
    +
    +		final C testCloseable = spy(createCloseable());
    +
    +		try {
    +
    +			closeableRegistry.registerClosable(testCloseable);
    +
    +			Assert.fail("Closed registry should not accept closeables!");
    +
    +		} catch (IOException expected) {
    +			//expected
    +		}
    +
    +		Assert.assertEquals(0, unclosedCounter.get());
    +		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
    +		verify(testCloseable).close();
    +	}
    +
    +	@Test
    +	public void testNonBlockingClose() throws Exception {
    +		setup();
    +
    +		final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
    +		final OneShotLatch blockCloseLatch = new OneShotLatch();
    +
    +		final C spyCloseable = spy(createCloseable());
    +
    +		doAnswer(invocationOnMock -> {
    +			invocationOnMock.callRealMethod();
    +			waitRegistryClosedLatch.trigger();
    +			blockCloseLatch.await();
    +			return null;
    +		}).when(spyCloseable).close();
    +
    +		closeableRegistry.registerClosable(spyCloseable);
    +
    +		Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
    +
    +		Thread closer = new Thread(() -> {
    +			try {
    +				closeableRegistry.close();
    +			} catch (IOException ignore) {
    +
    +			}
    +		});
    +
    +		closer.start();
    +		waitRegistryClosedLatch.await();
    +
    +		final C testCloseable = spy(createCloseable());
    +
    +		try {
    +			closeableRegistry.registerClosable(testCloseable);
    +			Assert.fail("Closed registry should not accept closeables!");
    +		}catch (IOException ignore) {
    +		}
    +
    +		blockCloseLatch.trigger();
    +		closer.join();
    +
    +		verify(spyCloseable).close();
    +		verify(testCloseable).close();
    +		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
    +	}
    +
    +	protected static abstract class ProducerThread<C extends Closeable, T> extends Thread {
    +
    +		protected final AbstractCloseableRegistry<C, T> registry;
    +		protected final AtomicInteger refCount;
    +		protected int maxStreams;
    +
    +		public ProducerThread(AbstractCloseableRegistry<C, T> registry, AtomicInteger refCount, int maxStreams) {
    +			this.registry = registry;
    +			this.refCount = refCount;
    +			this.maxStreams = maxStreams;
    +		}
    +
    +		public int getMaxStreams() {
    +			return maxStreams;
    +		}
    +
    +		public void setMaxStreams(int maxStreams) {
    +			this.maxStreams = maxStreams;
    +		}
    +
    +		protected abstract void createAndRegisterStream() throws IOException;
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (maxStreams > 0) {
    +
    +					createAndRegisterStream();
    +
    +					try {
    +						Thread.sleep(2);
    +					} catch (InterruptedException ignored) {}
    +
    +					if (maxStreams != Integer.MAX_VALUE) {
    +						--maxStreams;
    --- End diff --
    
    nit: I always find it strange when a field that should be a final is also used as a "counter". In those cases there should probably be a field `numStream` and you loop until `maxStreams == openStreams`.


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r136879648
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    I see


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r136877616
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    I think it should not. For some distributed filesystems a call to `close()` could block and would keep the registry blocked. We only need to synchronize the access to the internal map and the closed flag.


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140448606
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    I'm wondering, should it actually be the responsibility of the registry to close the `closeable` if it's already closed or should it be the responsibility of however wants to register that closable?


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140449401
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java ---
    @@ -27,10 +27,16 @@ private WrappingProxyUtil() {
     		throw new AssertionError();
     	}
     
    +	@SuppressWarnings("unchecked")
     	public static <T> T stripProxy(T object) {
    -		while (object instanceof WrappingProxy) {
    +
    +		T previous = null;
    +
    +		while (object instanceof WrappingProxy && previous != object) {
    --- End diff --
    
    Guarding against infinite looping in case the bottom-most object is also a `WrappingProxy`?


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:

    https://github.com/apache/flink/pull/4639


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r136877433
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    Should this also be in synchronized block?


---

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140455067
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---
    @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException {
     		}
     
     		synchronized (getSynchronizationLock()) {
    -			if (closed) {
    -				IOUtils.closeQuietly(closeable);
    -				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
    +			if (!closed) {
    +				doRegister(closeable, closeableToRef);
    +				return;
     			}
    -
    -			doRegister(closeable, closeableToRef);
     		}
    +
    +		IOUtils.closeQuietly(closeable);
    --- End diff --
    
    I have this done on purpose. For the time a ``Closeable`` is registered, it is managed by the registry but I do not want to stop somebody from removing a ``Closeable`` again and reclaim responsibility over the resource. I was, however, considering a convenience method that does the remove-and-close in one call. 


---