You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/12/13 13:06:08 UTC

[flink] branch master updated (cec17d0 -> ca28085)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cec17d0  [hotfix] [docs] Add notice about buggy dateFormat() function
     new e546069  [FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)
     new ca28085  [hotfix][core] Fix typo in WrappingProxyUtil Javadoc

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/fs/SafetyNetCloseableRegistry.java  |   4 +-
 .../org/apache/flink/util/WrappingProxyUtil.java   |  40 +++++--
 .../core/fs/AbstractCloseableRegistryTest.java     | 123 +++++++++++++--------
 .../flink/core/fs/CloseableRegistryTest.java       |   9 +-
 .../org/apache/flink/core/fs/FileSystemTest.java   |  24 +++-
 .../core/fs/SafetyNetCloseableRegistryTest.java    |  11 +-
 .../apache/flink/util/WrappingProxyUtilTest.java   |  76 +++++++++++++
 7 files changed, 217 insertions(+), 70 deletions(-)
 create mode 100644 flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java


[flink] 02/02: [hotfix][core] Fix typo in WrappingProxyUtil Javadoc

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca28085336681bdd10895f2e252716c9151dd7c8
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Wed Dec 12 09:50:59 2018 +0100

    [hotfix][core] Fix typo in WrappingProxyUtil Javadoc
---
 flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 7b274d7..dee9508 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
 import static java.lang.String.format;
 
 /**
- * Utilits for working with {@link WrappingProxy}.
+ * Utilities for working with {@link WrappingProxy}.
  */
 @Internal
 public final class WrappingProxyUtil {


[flink] 01/02: [FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e546069c300a51de47e6f752ff1b7491683b2356
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Mon Dec 10 16:58:21 2018 +0100

    [FLINK-11122][core] Change signature of WrappingProxyUtil#stripProxy(T)
    
    Changed signature so that the method expects a WrappingProxy<T>. This fixes a
    type inference ambiguity by the compiler.
    
    Make the method throw a runtime exception of it needs to strip more than 128
    proxies.
    
    Change AbstractCloseableRegistryTest so that no mocks are used.
    
    This closes #7273.
---
 .../flink/core/fs/SafetyNetCloseableRegistry.java  |   4 +-
 .../org/apache/flink/util/WrappingProxyUtil.java   |  38 ++++++-
 .../core/fs/AbstractCloseableRegistryTest.java     | 123 +++++++++++++--------
 .../flink/core/fs/CloseableRegistryTest.java       |   9 +-
 .../org/apache/flink/core/fs/FileSystemTest.java   |  24 +++-
 .../core/fs/SafetyNetCloseableRegistryTest.java    |  11 +-
 .../apache/flink/util/WrappingProxyUtilTest.java   |  76 +++++++++++++
 7 files changed, 216 insertions(+), 69 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index ccf944e..870dcbf 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -87,7 +87,7 @@ public class SafetyNetCloseableRegistry extends
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
-		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable);
 
 		if (null == innerCloseable) {
 			return;
@@ -108,7 +108,7 @@ public class SafetyNetCloseableRegistry extends
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
-		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable);
 
 		return null != innerCloseable && closeableMap.remove(innerCloseable) != null;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 3fbd6df..7b274d7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -19,6 +19,11 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
 
 /**
  * Utilits for working with {@link WrappingProxy}.
@@ -26,20 +31,41 @@ import org.apache.flink.annotation.Internal;
 @Internal
 public final class WrappingProxyUtil {
 
+	@VisibleForTesting
+	static final int SAFETY_NET_MAX_ITERATIONS = 128;
+
 	private WrappingProxyUtil() {
 		throw new AssertionError();
 	}
 
+	/**
+	 * Expects a proxy, and returns the unproxied delegate.
+	 *
+	 * @param wrappingProxy The initial proxy.
+	 * @param <T> The type of the delegate. Note that all proxies in the chain must be assignable to T.
+	 * @return The unproxied delegate.
+	 */
 	@SuppressWarnings("unchecked")
-	public static <T> T stripProxy(T object) {
+	public static <T> T stripProxy(@Nullable final WrappingProxy<T> wrappingProxy) {
+		if (wrappingProxy == null) {
+			return null;
+		}
 
-		T previous = null;
+		T delegate = wrappingProxy.getWrappedDelegate();
 
-		while (object instanceof WrappingProxy && previous != object) {
-			previous = object;
-			object = ((WrappingProxy<T>) object).getWrappedDelegate();
+		int numProxiesStripped = 0;
+		while (delegate instanceof WrappingProxy) {
+			throwIfSafetyNetExceeded(++numProxiesStripped);
+			delegate = ((WrappingProxy<T>) delegate).getWrappedDelegate();
 		}
 
-		return object;
+		return delegate;
+	}
+
+	private static void throwIfSafetyNetExceeded(final int numProxiesStripped) {
+		if (numProxiesStripped >= SAFETY_NET_MAX_ITERATIONS) {
+			throw new IllegalArgumentException(format("Already stripped %d proxies. " +
+				"Are there loops in the object graph?", SAFETY_NET_MAX_ITERATIONS));
+		}
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index eb07378..d8d639e 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.core.fs;
 
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.AbstractCloseableRegistry;
 
 import org.junit.Assert;
@@ -26,22 +25,27 @@ import org.junit.Test;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link AbstractCloseableRegistry}.
  */
 public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
+	private static final int TEST_TIMEOUT_SECONDS = 10;
+
 	protected ProducerThread[] streamOpenThreads;
 	protected AbstractCloseableRegistry<C, T> closeableRegistry;
 	protected AtomicInteger unclosedCounter;
 
-	protected abstract C createCloseable();
+	protected abstract void registerCloseable(Closeable closeable) throws IOException;
 
 	protected abstract AbstractCloseableRegistry<C, T> createRegistry();
 
@@ -74,7 +78,6 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 	@Test
 	public void testClose() throws Exception {
-
 		setup(Integer.MAX_VALUE);
 		startThreads();
 
@@ -87,45 +90,29 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 		joinThreads();
 
-		Assert.assertEquals(0, unclosedCounter.get());
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertEquals(0, unclosedCounter.get());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 
-		final C testCloseable = spy(createCloseable());
+		final TestCloseable testCloseable = new TestCloseable();
 
 		try {
+			registerCloseable(testCloseable);
+			fail("Closed registry should not accept closeables!");
+		} catch (IOException expected) {}
 
-			closeableRegistry.registerCloseable(testCloseable);
-
-			Assert.fail("Closed registry should not accept closeables!");
-
-		} catch (IOException expected) {
-			//expected
-		}
-
-		Assert.assertEquals(0, unclosedCounter.get());
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
-		verify(testCloseable).close();
+		assertTrue(testCloseable.isClosed());
+		assertEquals(0, unclosedCounter.get());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 	}
 
 	@Test
 	public void testNonBlockingClose() throws Exception {
 		setup(Integer.MAX_VALUE);
 
-		final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
-		final OneShotLatch blockCloseLatch = new OneShotLatch();
-
-		final C spyCloseable = spy(createCloseable());
-
-		doAnswer(invocationOnMock -> {
-			invocationOnMock.callRealMethod();
-			waitRegistryClosedLatch.trigger();
-			blockCloseLatch.await();
-			return null;
-		}).when(spyCloseable).close();
-
-		closeableRegistry.registerCloseable(spyCloseable);
+		final BlockingTestCloseable blockingCloseable = new BlockingTestCloseable();
+		registerCloseable(blockingCloseable);
 
-		Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
 
 		Thread closer = new Thread(() -> {
 			try {
@@ -134,23 +121,20 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 			}
 		});
-
 		closer.start();
-		waitRegistryClosedLatch.await();
-
-		final C testCloseable = spy(createCloseable());
+		blockingCloseable.awaitClose(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+		final TestCloseable testCloseable = new TestCloseable();
 		try {
-			closeableRegistry.registerCloseable(testCloseable);
-			Assert.fail("Closed registry should not accept closeables!");
+			registerCloseable(testCloseable);
+			fail("Closed registry should not accept closeables!");
 		} catch (IOException ignored) {}
 
-		blockCloseLatch.trigger();
+		blockingCloseable.unblockClose();
 		closer.join();
 
-		verify(spyCloseable).close();
-		verify(testCloseable).close();
-		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+		assertTrue(testCloseable.isClosed());
+		assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 	}
 
 	/**
@@ -225,4 +209,55 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 			refCount.decrementAndGet();
 		}
 	}
+
+	/**
+	 * A noop {@link Closeable} implementation that blocks inside {@link #close()}.
+	 */
+	private static class BlockingTestCloseable implements Closeable {
+
+		private final CountDownLatch closeCalledLatch = new CountDownLatch(1);
+
+		private final CountDownLatch blockCloseLatch = new CountDownLatch(1);
+
+		@Override
+		public void close() throws IOException {
+			closeCalledLatch.countDown();
+			try {
+				blockCloseLatch.await();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		}
+
+		/**
+		 * Unblocks {@link #close()}.
+		 */
+		public void unblockClose() {
+			blockCloseLatch.countDown();
+		}
+
+		/**
+		 * Causes the current thread to wait until {@link #close()} is called.
+		 */
+		public void awaitClose(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
+			closeCalledLatch.await(timeout, timeUnit);
+		}
+	}
+
+	/**
+	 * A noop {@link Closeable} implementation that tracks whether it was closed.
+	 */
+	private static class TestCloseable implements Closeable {
+
+		private final AtomicBoolean closed = new AtomicBoolean();
+
+		@Override
+		public void close() throws IOException {
+			assertTrue("TestCloseable was already closed", closed.compareAndSet(false, true));
+		}
+
+		public boolean isClosed() {
+			return closed.get();
+		}
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index 8a0fb96..63d08d5 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -30,13 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
 
 	@Override
-	protected Closeable createCloseable() {
-		return new Closeable() {
-			@Override
-			public void close() throws IOException {
-
-			}
-		};
+	protected void registerCloseable(final Closeable closeable) throws IOException {
+		closeableRegistry.registerCloseable(closeable);
 	}
 
 	@Override
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 598b1e1..f6d3f45 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxy;
 import org.apache.flink.util.WrappingProxyUtil;
 
 import org.junit.Test;
@@ -38,21 +39,32 @@ public class FileSystemTest {
 	public void testGet() throws URISyntaxException, IOException {
 		String scheme = "file";
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":///test/test") instanceof LocalFileSystem);
 
 		try {
-			FileSystem.get(new URI(scheme + "://test/test"));
+			getFileSystemWithoutSafetyNet(scheme + "://test/test");
 		} catch (IOException ioe) {
 			assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
 		}
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":/test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet(scheme + ":test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet("/test/test") instanceof LocalFileSystem);
 
-		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
+		assertTrue(getFileSystemWithoutSafetyNet("test/test") instanceof LocalFileSystem);
+	}
+
+	private static FileSystem getFileSystemWithoutSafetyNet(final String uri) throws URISyntaxException, IOException {
+		final FileSystem fileSystem = FileSystem.get(new URI(uri));
+
+		if (fileSystem instanceof WrappingProxy) {
+			//noinspection unchecked
+			return WrappingProxyUtil.stripProxy((WrappingProxy<FileSystem>) fileSystem);
+		}
+
+		return fileSystem;
 	}
 
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 5474f99..44461a3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -43,17 +43,20 @@ public class SafetyNetCloseableRegistryTest
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	@Override
-	protected WrappingProxyCloseable<? extends Closeable> createCloseable() {
-		return new WrappingProxyCloseable<Closeable>() {
+	protected void registerCloseable(final Closeable closeable) throws IOException {
+		final WrappingProxyCloseable<Closeable> wrappingProxyCloseable = new WrappingProxyCloseable<Closeable>() {
 
 			@Override
-			public void close() throws IOException {}
+			public void close() throws IOException {
+				closeable.close();
+			}
 
 			@Override
 			public Closeable getWrappedDelegate() {
-				return this;
+				return closeable;
 			}
 		};
+		closeableRegistry.registerCloseable(wrappingProxyCloseable);
 	}
 
 	@Override
diff --git a/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
new file mode 100644
index 0000000..b557b09
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/WrappingProxyUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link WrappingProxyUtil}.
+ */
+public class WrappingProxyUtilTest {
+
+	@Test
+	public void testThrowsExceptionIfTooManyProxies() {
+		try {
+			WrappingProxyUtil.stripProxy(new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS));
+			fail("Expected exception not thrown");
+		} catch (final IllegalArgumentException e) {
+			assertThat(e.getMessage(), containsString("Are there loops in the object graph?"));
+		}
+	}
+
+	@Test
+	public void testStripsAllProxies() {
+		final SelfWrappingProxy wrappingProxy = new SelfWrappingProxy(WrappingProxyUtil.SAFETY_NET_MAX_ITERATIONS - 1);
+		assertThat(WrappingProxyUtil.stripProxy(wrappingProxy), is(not(instanceOf(SelfWrappingProxy.class))));
+	}
+
+	private static class Wrapped {
+	}
+
+	/**
+	 * Wraps around {@link Wrapped} a specified number of times.
+	 */
+	private static class SelfWrappingProxy extends Wrapped implements WrappingProxy<Wrapped> {
+
+		private int levels;
+
+		private SelfWrappingProxy(final int levels) {
+			this.levels = levels;
+		}
+
+		@Override
+		public Wrapped getWrappedDelegate() {
+			if (levels-- == 0) {
+				return new Wrapped();
+			} else {
+				return this;
+			}
+		}
+	}
+
+}