You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/13 13:06:09 UTC
[flink] 01/02: [FLINK-11122][core] Change signature of
WrappingProxyUtil#stripProxy(T)
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e546069c300a51de47e6f752ff1b7491683b2356
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Mon Dec 10 16:58:21 2018 +0100
[FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)
Changed signature so that the method expects a WrappingProxy<T>. This fixes a
type inference ambiguity by the compiler.
Make the method throw a runtime exception of it needs to strip more than 128
proxies.
Change AbstractCloseableRegistryTest so that no mocks are used.
This closes #7273.
---
.../flink/core/fs/SafetyNetCloseableRegistry.java | 4 +-
.../org/apache/flink/util/WrappingProxyUtil.java | 38 ++++++-
.../core/fs/AbstractCloseableRegistryTest.java | 123 +++++++++++++--------
.../flink/core/fs/CloseableRegistryTest.java | 9 +-
.../org/apache/flink/core/fs/FileSystemTest.java | 24 +++-
.../core/fs/SafetyNetCloseableRegistryTest.java | 11 +-
.../apache/flink/util/WrappingProxyUtilTest.java | 76 +++++++++++++
7 files changed, 216 insertions(+), 69 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index ccf944e..870dcbf 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -87,7 +87,7 @@ public class SafetyNetCloseableRegistry extends
assert Thread.holdsLock(getSynchronizationLock());
- Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+ Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable);
if (null == innerCloseable) {
return;
@@ -108,7 +108,7 @@ public class SafetyNetCloseableRegistry extends
assert Thread.holdsLock(getSynchronizationLock());
- Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+ Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable);
return null != innerCloseable && closeableMap.remove(innerCloseable) != null;
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 3fbd6df..7b274d7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -19,6 +19,11 @@
package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
/**
* Utilits for working with {@link WrappingProxy}.
@@ -26,20 +31,41 @@ import org.apache.flink.annotation.Internal;
@Internal
public final class WrappingProxyUtil {
+ @VisibleForTesting
+ static final int SAFETY_NET_MAX_ITERATIONS = 128;
+
private WrappingProxyUtil() {
throw new AssertionError();
}
+ /**
+ * Expects a proxy, and returns the unproxied delegate.
+ *
+ * @param wrappingProxy The initial proxy.
+ * @param <T> The type of the delegate. Note that all proxies in the chain must be assignable to T.
+ * @return The unproxied delegate.
+ */
@SuppressWarnings("unchecked")
- public static <T> T stripProxy(T object) {
+ public static <T> T stripProxy(@Nullable final WrappingProxy<T> wrappingProxy) {
+ if (wrappingProxy == null) {
+ return null;
+ }
- T previous = null;
+ T delegate = wrappingProxy.getWrappedDelegate();
- while (object instanceof WrappingProxy && previous != object) {
- previous = object;
- object = ((WrappingProxy<T>) object).getWrappedDelegate();
+ int numProxiesStripped = 0;
+ while (delegate instanceof WrappingProxy) {
+ throwIfSafetyNetExceeded(++numProxiesStripped);
+ delegate = ((WrappingProxy<T>) delegate).getWrappedDelegate();
}
- return object;
+ return delegate;
+ }
+
+ private static void throwIfSafetyNetExceeded(final int numProxiesStripped) {
+ if (numProxiesStripped >= SAFETY_NET_MAX_ITERATIONS) {
+ throw new IllegalArgumentException(format("Already stripped %d proxies. " +
+ "Are there loops in the object graph?", SAFETY_NET_MAX_ITERATIONS));
+ }
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index eb07378..d8d639e 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.core.fs;
-import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.AbstractCloseableRegistry;
import org.junit.Assert;
@@ -26,22 +25,27 @@ import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link AbstractCloseableRegistry}.
*/
public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
+ private static final int TEST_TIMEOUT_SECONDS = 10;
+
protected ProducerThread[] streamOpenThreads;
protected AbstractCloseableRegistry<C, T> closeableRegistry;
protected AtomicInteger unclosedCounter;
- protected abstract C createCloseable();
+ protected abstract void registerCloseable(Closeable closeable) throws IOException;
protected abstract AbstractCloseableRegistry<C, T> createRegistry();
@@ -74,7 +78,6 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
@Test
public void testClose() throws Exception {
-
setup(Integer.MAX_VALUE);
startThreads();
@@ -87,45 +90,29 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
joinThreads();
- Assert.assertEquals(0, unclosedCounter.get());
- Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+ assertEquals(0, unclosedCounter.get());
+ assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
- final C testCloseable = spy(createCloseable());
+ final TestCloseable testCloseable = new TestCloseable();
try {
+ registerCloseable(testCloseable);
+ fail("Closed registry should not accept closeables!");
+ } catch (IOException expected) {}
- closeableRegistry.registerCloseable(testCloseable);
-
- Assert.fail("Closed registry should not accept closeables!");
-
- } catch (IOException expected) {
- //expected
- }
-
- Assert.assertEquals(0, unclosedCounter.get());
- Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
- verify(testCloseable).close();
+ assertTrue(testCloseable.isClosed());
+ assertEquals(0, unclosedCounter.get());
+ assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
}
@Test
public void testNonBlockingClose() throws Exception {
setup(Integer.MAX_VALUE);
- final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
- final OneShotLatch blockCloseLatch = new OneShotLatch();
-
- final C spyCloseable = spy(createCloseable());
-
- doAnswer(invocationOnMock -> {
- invocationOnMock.callRealMethod();
- waitRegistryClosedLatch.trigger();
- blockCloseLatch.await();
- return null;
- }).when(spyCloseable).close();
-
- closeableRegistry.registerCloseable(spyCloseable);
+ final BlockingTestCloseable blockingCloseable = new BlockingTestCloseable();
+ registerCloseable(blockingCloseable);
- Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
+ assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
Thread closer = new Thread(() -> {
try {
@@ -134,23 +121,20 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
}
});
-
closer.start();
- waitRegistryClosedLatch.await();
-
- final C testCloseable = spy(createCloseable());
+ blockingCloseable.awaitClose(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ final TestCloseable testCloseable = new TestCloseable();
try {
- closeableRegistry.registerCloseable(testCloseable);
- Assert.fail("Closed registry should not accept closeables!");
+ registerCloseable(testCloseable);
+ fail("Closed registry should not accept closeables!");
} catch (IOException ignored) {}
- blockCloseLatch.trigger();
+ blockingCloseable.unblockClose();
closer.join();
- verify(spyCloseable).close();
- verify(testCloseable).close();
- Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+ assertTrue(testCloseable.isClosed());
+ assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
}
/**
@@ -225,4 +209,55 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
refCount.decrementAndGet();
}
}
+
+ /**
+ * A noop {@link Closeable} implementation that blocks inside {@link #close()}.
+ */
+ private static class BlockingTestCloseable implements Closeable {
+
+ private final CountDownLatch closeCalledLatch = new CountDownLatch(1);
+
+ private final CountDownLatch blockCloseLatch = new CountDownLatch(1);
+
+ @Override
+ public void close() throws IOException {
+ closeCalledLatch.countDown();
+ try {
+ blockCloseLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Unblocks {@link #close()}.
+ */
+ public void unblockClose() {
+ blockCloseLatch.countDown();
+ }
+
+ /**
+ * Causes the current thread to wait until {@link #close()} is called.
+ */
+ public void awaitClose(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
+ closeCalledLatch.await(timeout, timeUnit);
+ }
+ }
+
+ /**
+ * A noop {@link Closeable} implementation that tracks whether it was closed.
+ */
+ private static class TestCloseable implements Closeable {
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ @Override
+ public void close() throws IOException {
+ assertTrue("TestCloseable was already closed", closed.compareAndSet(false, true));
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+ }
}
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index 8a0fb96..63d08d5 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -30,13 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
@Override
- protected Closeable createCloseable() {
- return new Closeable() {
- @Override
- public void close() throws IOException {
-
- }
- };
+ protected void registerCloseable(final Closeable closeable) throws IOException {
+ closeableRegistry.registerCloseable(closeable);
}
@Override
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 598b1e1..f6d3f45 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.core.fs;
import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxy;
import org.apache.flink.util.WrappingProxyUtil;
import org.junit.Test;
@@ -38,21 +39,32 @@ public class FileSystemTest {
public void testGet() throws URISyntaxException, IOException {
String scheme = "file";
- assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+ assertTrue(getFileSystemWithoutSafetyNet(scheme + ":///test/test") instanceof LocalFileSystem);
try {
- FileSystem.get(new URI(scheme + "://test/test"));
+ getFileSystemWithoutSafetyNet(scheme + "://test/test");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
}
- assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
+ assertTrue(getFileSystemWithoutSafetyNet(scheme + ":/test/test") instanceof LocalFileSystem);
- assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+ assertTrue(getFileSystemWithoutSafetyNet(scheme + ":test/test") instanceof LocalFileSystem);
- assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+ assertTrue(getFileSystemWithoutSafetyNet("/test/test") instanceof LocalFileSystem);
- assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
+ assertTrue(getFileSystemWithoutSafetyNet("test/test") instanceof LocalFileSystem);
+ }
+
+ private static FileSystem getFileSystemWithoutSafetyNet(final String uri) throws URISyntaxException, IOException {
+ final FileSystem fileSystem = FileSystem.get(new URI(uri));
+
+ if (fileSystem instanceof WrappingProxy) {
+ //noinspection unchecked
+ return WrappingProxyUtil.stripProxy((WrappingProxy<FileSystem>) fileSystem);
+ }
+
+ return fileSystem;
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 5474f99..44461a3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -43,17 +43,20 @@ public class SafetyNetCloseableRegistryTest
public final TemporaryFolder tmpFolder = new TemporaryFolder();
@Override
- protected WrappingProxyCloseable<? extends Closeable> createCloseable() {
- return new WrappingProxyCloseable<Closeable>() {
+ protected void registerCloseable(final Closeable closeable) throws IOException {
+ final WrappingProxyCloseable<Closeable> wrappingProxyCloseable = new WrappingProxyCloseable<Closeable>() {
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ closeable.close();
+ }
@Override
public Closeable getWrappedDelegate() {
- return this;
+ return closeable;
}
};
+ closeableRegistry.registerCloseable(wrappingProxyCloseable);
}
@Override
diff --git a/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
new file mode 100644
index 0000000..b557b09
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link WrappingProxyUtil}.
+ */
+public class WrappingProxyUtilTest {
+
+ @Test
+ public void testThrowsExceptionIfTooManyProxies() {
+ try {
+ WrappingProxyUtil.stripProxy(new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS));
+ fail("Expected exception not thrown");
+ } catch (final IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("Are there loops in the object graph?"));
+ }
+ }
+
+ @Test
+ public void testStripsAllProxies() {
+ final SelfWrappingProxy wrappingProxy = new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS - 1);
+ assertThat(WrappingProxyUtil.stripProxy(wrappingProxy), is(not(instanceOf(SelfWrappingProxy.class))));
+ }
+
+ private static class Wrapped {
+ }
+
+ /**
+ * Wraps around {@link Wrapped} a specified number of times.
+ */
+ private static class SelfWrappingProxy extends Wrapped implements WrappingProxy<Wrapped> {
+
+ private int levels;
+
+ private SelfWrappingProxy(final int levels) {
+ this.levels = levels;
+ }
+
+ @Override
+ public Wrapped getWrappedDelegate() {
+ if (levels-- == 0) {
+ return new Wrapped();
+ } else {
+ return this;
+ }
+ }
+ }
+
+}