You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/13 13:06:09 UTC

[flink] 01/02: [FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e546069c300a51de47e6f752ff1b7491683b2356
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Mon Dec 10 16:58:21 2018 +0100

    [FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)
    
    Changed signature so that the method expects a WrappingProxy<T>. This fixes a
    type inference ambiguity by the compiler.
    
    Make the method throw a runtime exception of it needs to strip more than 128
    proxies.
    
    Change AbstractCloseableRegistryTest so that no mocks are used.
    
    This closes #7273.
---
 .../flink/core/fs/SafetyNetCloseableRegistry.java  |   4 +-
 .../org/apache/flink/util/WrappingProxyUtil.java   |  38 ++++++-
 .../core/fs/AbstractCloseableRegistryTest.java     | 123 +++++++++++++--------
 .../flink/core/fs/CloseableRegistryTest.java       |   9 +-
 .../org/apache/flink/core/fs/FileSystemTest.java   |  24 +++-
 .../core/fs/SafetyNetCloseableRegistryTest.java    |  11 +-
 .../apache/flink/util/WrappingProxyUtilTest.java   |  76 +++++++++++++
 7 files changed, 216 insertions(+), 69 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index ccf944e..870dcbf 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -87,7 +87,7 @@ public class SafetyNetCloseableRegistry extends
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
-		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable);
 
 		if (null == innerCloseable) {
 			return;
@@ -108,7 +108,7 @@ public class SafetyNetCloseableRegistry extends
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
-		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable);
 
 		return null != innerCloseable && closeableMap.remove(innerCloseable) != null;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 3fbd6df..7b274d7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -19,6 +19,11 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
 
 /**
  * Utilits for working with {@link WrappingProxy}.
@@ -26,20 +31,41 @@ import org.apache.flink.annotation.Internal;
 @Internal
 public final class WrappingProxyUtil {
 
+	@VisibleForTesting
+	static final int SAFETY_NET_MAX_ITERATIONS = 128;
+
 	private WrappingProxyUtil() {
 		throw new AssertionError();
 	}
 
+	/**
+	 * Expects a proxy, and returns the unproxied delegate.
+	 *
+	 * @param wrappingProxy The initial proxy.
+	 * @param <T> The type of the delegate. Note that all proxies in the chain must be assignable to T.
+	 * @return The unproxied delegate.
+	 */
 	@SuppressWarnings("unchecked")
-	public static <T> T stripProxy(T object) {
+	public static <T> T stripProxy(@Nullable final WrappingProxy<T> wrappingProxy) {
+		if (wrappingProxy == null) {
+			return null;
+		}
 
-		T previous = null;
+		T delegate = wrappingProxy.getWrappedDelegate();
 
-		while (object instanceof WrappingProxy && previous != object) {
-			previous = object;
-			object = ((WrappingProxy<T>) object).getWrappedDelegate();
+		int numProxiesStripped = 0;
+		while (delegate instanceof WrappingProxy) {
+			throwIfSafetyNetExceeded(++numProxiesStripped);
+			delegate = ((WrappingProxy<T>) delegate).getWrappedDelegate();
 		}
 
-		return object;
+		return delegate;
+	}
+
+	private static void throwIfSafetyNetExceeded(final int numProxiesStripped) {
+		if (numProxiesStripped >= SAFETY_NET_MAX_ITERATIONS) {
+			throw new IllegalArgumentException(format("Already stripped %d proxies. " +
+				"Are there loops in the object graph?", SAFETY_NET_MAX_ITERATIONS));
+		}
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index eb07378..d8d639e 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.core.fs;
 
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.AbstractCloseableRegistry;
 
 import org.junit.Assert;
@@ -26,22 +25,27 @@ import org.junit.Test;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link AbstractCloseableRegistry}.
  */
 public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
+	private static final int TEST_TIMEOUT_SECONDS = 10;
+
 	protected ProducerThread[] streamOpenThreads;
 	protected AbstractCloseableRegistry<C, T> closeableRegistry;
 	protected AtomicInteger unclosedCounter;
 
-	protected abstract C createCloseable();
+	protected abstract void registerCloseable(Closeable closeable) throws IOException;
 
 	protected abstract AbstractCloseableRegistry<C, T> createRegistry();
 
@@ -74,7 +78,6 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 	@Test
 	public void testClose() throws Exception {
-
 		setup(Integer.MAX_VALUE);
 		startThreads();
 
@@ -87,45 +90,29 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 		joinThreads();
 
-		Assert.assertEquals(0, unclosedCounter.get());
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertEquals(0, unclosedCounter.get());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 
-		final C testCloseable = spy(createCloseable());
+		final TestCloseable testCloseable = new TestCloseable();
 
 		try {
+			registerCloseable(testCloseable);
+			fail("Closed registry should not accept closeables!");
+		} catch (IOException expected) {}
 
-			closeableRegistry.registerCloseable(testCloseable);
-
-			Assert.fail("Closed registry should not accept closeables!");
-
-		} catch (IOException expected) {
-			//expected
-		}
-
-		Assert.assertEquals(0, unclosedCounter.get());
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
-		verify(testCloseable).close();
+		assertTrue(testCloseable.isClosed());
+		assertEquals(0, unclosedCounter.get());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 	}
 
 	@Test
 	public void testNonBlockingClose() throws Exception {
 		setup(Integer.MAX_VALUE);
 
-		final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
-		final OneShotLatch blockCloseLatch = new OneShotLatch();
-
-		final C spyCloseable = spy(createCloseable());
-
-		doAnswer(invocationOnMock -> {
-			invocationOnMock.callRealMethod();
-			waitRegistryClosedLatch.trigger();
-			blockCloseLatch.await();
-			return null;
-		}).when(spyCloseable).close();
-
-		closeableRegistry.registerCloseable(spyCloseable);
+		final BlockingTestCloseable blockingCloseable = new BlockingTestCloseable();
+		registerCloseable(blockingCloseable);
 
-		Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
 
 		Thread closer = new Thread(() -> {
 			try {
@@ -134,23 +121,20 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 			}
 		});
-
 		closer.start();
-		waitRegistryClosedLatch.await();
-
-		final C testCloseable = spy(createCloseable());
+		blockingCloseable.awaitClose(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+		final TestCloseable testCloseable = new TestCloseable();
 		try {
-			closeableRegistry.registerCloseable(testCloseable);
-			Assert.fail("Closed registry should not accept closeables!");
+			registerCloseable(testCloseable);
+			fail("Closed registry should not accept closeables!");
 		} catch (IOException ignored) {}
 
-		blockCloseLatch.trigger();
+		blockingCloseable.unblockClose();
 		closer.join();
 
-		verify(spyCloseable).close();
-		verify(testCloseable).close();
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertTrue(testCloseable.isClosed());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 	}
 
 	/**
@@ -225,4 +209,55 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 			refCount.decrementAndGet();
 		}
 	}
+
+	/**
+	 * A noop {@link Closeable} implementation that blocks inside {@link #close()}.
+	 */
+	private static class BlockingTestCloseable implements Closeable {
+
+		private final CountDownLatch closeCalledLatch = new CountDownLatch(1);
+
+		private final CountDownLatch blockCloseLatch = new CountDownLatch(1);
+
+		@Override
+		public void close() throws IOException {
+			closeCalledLatch.countDown();
+			try {
+				blockCloseLatch.await();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		}
+
+		/**
+		 * Unblocks {@link #close()}.
+		 */
+		public void unblockClose() {
+			blockCloseLatch.countDown();
+		}
+
+		/**
+		 * Causes the current thread to wait until {@link #close()} is called.
+		 */
+		public void awaitClose(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
+			closeCalledLatch.await(timeout, timeUnit);
+		}
+	}
+
+	/**
+	 * A noop {@link Closeable} implementation that tracks whether it was closed.
+	 */
+	private static class TestCloseable implements Closeable {
+
+		private final AtomicBoolean closed = new AtomicBoolean();
+
+		@Override
+		public void close() throws IOException {
+			assertTrue("TestCloseable was already closed", closed.compareAndSet(false, true));
+		}
+
+		public boolean isClosed() {
+			return closed.get();
+		}
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index 8a0fb96..63d08d5 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -30,13 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
 
 	@Override
-	protected Closeable createCloseable() {
-		return new Closeable() {
-			@Override
-			public void close() throws IOException {
-
-			}
-		};
+	protected void registerCloseable(final Closeable closeable) throws IOException {
+		closeableRegistry.registerCloseable(closeable);
 	}
 
 	@Override
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 598b1e1..f6d3f45 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxy;
 import org.apache.flink.util.WrappingProxyUtil;
 
 import org.junit.Test;
@@ -38,21 +39,32 @@ public class FileSystemTest {
 	public void testGet() throws URISyntaxException, IOException {
 		String scheme = "file";
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":///test/test") instanceof LocalFileSystem);
 
 		try {
-			FileSystem.get(new URI(scheme + "://test/test"));
+			getFileSystemWithoutSafetyNet(scheme + "://test/test");
 		} catch (IOException ioe) {
 			assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
 		}
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":/test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet("/test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet("test/test") instanceof LocalFileSystem);
+	}
+
+	private static FileSystem getFileSystemWithoutSafetyNet(final String uri) throws URISyntaxException, IOException {
+		final FileSystem fileSystem = FileSystem.get(new URI(uri));
+
+		if (fileSystem instanceof WrappingProxy) {
+			//noinspection unchecked
+			return WrappingProxyUtil.stripProxy((WrappingProxy<FileSystem>) fileSystem);
+		}
+
+		return fileSystem;
 	}
 
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 5474f99..44461a3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -43,17 +43,20 @@ public class SafetyNetCloseableRegistryTest
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	@Override
-	protected WrappingProxyCloseable<? extends Closeable> createCloseable() {
-		return new WrappingProxyCloseable<Closeable>() {
+	protected void registerCloseable(final Closeable closeable) throws IOException {
+		final WrappingProxyCloseable<Closeable> wrappingProxyCloseable = new WrappingProxyCloseable<Closeable>() {
 
 			@Override
-			public void close() throws IOException {}
+			public void close() throws IOException {
+				closeable.close();
+			}
 
 			@Override
 			public Closeable getWrappedDelegate() {
-				return this;
+				return closeable;
 			}
 		};
+		closeableRegistry.registerCloseable(wrappingProxyCloseable);
 	}
 
 	@Override
diff --git a/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
new file mode 100644
index 0000000..b557b09
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link WrappingProxyUtil}.
+ */
+public class WrappingProxyUtilTest {
+
+	@Test
+	public void testThrowsExceptionIfTooManyProxies() {
+		try {
+			WrappingProxyUtil.stripProxy(new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS));
+			fail("Expected exception not thrown");
+		} catch (final IllegalArgumentException e) {
+			assertThat(e.getMessage(), containsString("Are there loops in the object graph?"));
+		}
+	}
+
+	@Test
+	public void testStripsAllProxies() {
+		final SelfWrappingProxy wrappingProxy = new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS - 1);
+		assertThat(WrappingProxyUtil.stripProxy(wrappingProxy), is(not(instanceOf(SelfWrappingProxy.class))));
+	}
+
+	private static class Wrapped {
+	}
+
+	/**
+	 * Wraps around {@link Wrapped} a specified number of times.
+	 */
+	private static class SelfWrappingProxy extends Wrapped implements WrappingProxy<Wrapped> {
+
+		private int levels;
+
+		private SelfWrappingProxy(final int levels) {
+			this.levels = levels;
+		}
+
+		@Override
+		public Wrapped getWrappedDelegate() {
+			if (levels-- == 0) {
+				return new Wrapped();
+			} else {
+				return this;
+			}
+		}
+	}
+
+}