You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/12/05 11:32:06 UTC

[1/2] flink git commit: [hotfix] [core] Fix remaining checkstyle issues for 'core.fs'

Repository: flink
Updated Branches:
  refs/heads/master b051a4c88 -> 2d4762c6c


[hotfix] [core] Fix remaining checkstyle issues for 'core.fs'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23ea197b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23ea197b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23ea197b

Branch: refs/heads/master
Commit: 23ea197b751a98d10a1bd549175f2566f3a7c227
Parents: b051a4c
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 24 13:55:51 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 4 18:56:30 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |  2 ++
 .../flink/core/fs/FileSystemSafetyNet.java      | 20 +++++++--------
 .../core/fs/SafetyNetCloseableRegistry.java     | 26 +++++++++++---------
 .../core/fs/SafetyNetWrapperFileSystem.java     |  2 ++
 .../flink/core/fs/local/LocalBlockLocation.java |  4 +--
 .../core/fs/local/LocalDataInputStream.java     | 11 +++++----
 .../core/fs/local/LocalDataOutputStream.java    |  9 +++----
 .../flink/core/fs/local/LocalFileStatus.java    |  7 +++---
 .../flink/core/fs/local/LocalFileSystem.java    | 20 +++++++--------
 .../core/fs/AbstractCloseableRegistryTest.java  | 14 ++++++++---
 .../flink/core/fs/CloseableRegistryTest.java    |  3 +++
 .../apache/flink/core/fs/FileSystemTest.java    |  5 ++++
 .../flink/core/fs/InitOutputPathTest.java       | 12 +++++----
 ...itedConnectionsFileSystemDelegationTest.java |  1 -
 .../fs/LimitedConnectionsFileSystemTest.java    |  2 +-
 .../java/org/apache/flink/core/fs/PathTest.java | 22 ++++++++++++-----
 .../core/fs/SafetyNetCloseableRegistryTest.java |  7 +++---
 .../core/fs/local/LocalFileSystemTest.java      | 26 ++++++++++----------
 tools/maven/suppressions-core.xml               |  8 ------
 19 files changed, 113 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 18baaa5..07a1e76 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -229,7 +229,9 @@ public abstract class FileSystem {
 
 	/** The default filesystem scheme to be used, configured during process-wide initialization.
 	 * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
+	//CHECKSTYLE.OFF: StaticVariableName
 	private static URI DEFAULT_SCHEME;
+	//CHECKSTYLE.ON: StaticVariableName
 
 	// ------------------------------------------------------------------------
 	//  Initialization

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index c06ccac..d72aec0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -30,22 +30,22 @@ import static org.apache.flink.util.Preconditions.checkState;
  * When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
  * obtains. The safety net has a global cleanup hook that will close all streams that were
  * not properly closed.
- * 
+ *
  * <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
  * by this safety net.
- * 
+ *
  * <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
  * i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
  * {@link Path#getFileSystem()}.
- * 
+ *
  * <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
  * to another thread, the safety net will close those resources once the former thread finishes.
- * 
+ *
  * <p>The safety net can be used as follows:
  * <pre>{@code
- * 
+ *
  * class GuardedThread extends Thread {
- * 
+ *
  *     public void run() {
  *         FileSystemSafetyNet.initializeSafetyNetForThread();
  *         try {
@@ -62,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class FileSystemSafetyNet {
 
-	/** The map from thread to the safety net registry for that thread */
+	/** The map from thread to the safety net registry for that thread. */
 	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
 
 	// ------------------------------------------------------------------------
@@ -73,9 +73,9 @@ public class FileSystemSafetyNet {
 	 * Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
 	 * that called this method will be guarded, meaning that their created streams are tracked and can
 	 * be closed via the safety net closing hook.
-	 * 
+	 *
 	 * <p>This method should be called at the beginning of a thread that should be guarded.
-	 * 
+	 *
 	 * @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
 	 */
 	@Internal
@@ -94,7 +94,7 @@ public class FileSystemSafetyNet {
 	 * Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
 	 * by safety-net-guarded file systems. After this method was called, no streams can be opened any more
 	 * from any FileSystem instance that was obtained while the thread was guarded by the safety net.
-	 * 
+	 *
 	 * <p>This method should be called at the very end of a guarded thread.
 	 */
 	@Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 9c4272f..ccf944e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -39,13 +39,13 @@ import java.util.Map;
 /**
  * This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
  * the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
- * <p>
- * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ *
+ * <p>Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
  * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
- * <p>
- * Other than that, it works like a normal {@link CloseableRegistry}.
- * <p>
- * All methods in this class are thread-safe.
+ *
+ * <p>Other than that, it works like a normal {@link CloseableRegistry}.
+ *
+ * <p>All methods in this class are thread-safe.
  */
 @Internal
 public class SafetyNetCloseableRegistry extends
@@ -54,15 +54,19 @@ public class SafetyNetCloseableRegistry extends
 
 	private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
 
-	/** Lock for atomic modifications to reaper thread and registry count */
+	/** Lock for atomic modifications to reaper thread and registry count. */
 	private static final Object REAPER_THREAD_LOCK = new Object();
 
-	/** Singleton reaper thread takes care of all registries in VM */
+	//CHECKSTYLE.OFF: StaticVariableName
+
+	/** Singleton reaper thread takes care of all registries in VM. */
 	private static CloseableReaperThread REAPER_THREAD = null;
 
-	/** Global count of all instances of SafetyNetCloseableRegistry */
+	/** Global count of all instances of SafetyNetCloseableRegistry. */
 	private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
 
+	//CHECKSTYLE.ON: StaticVariableName
+
 	SafetyNetCloseableRegistry() {
 		super(new IdentityHashMap<>());
 
@@ -166,7 +170,7 @@ public class SafetyNetCloseableRegistry extends
 	}
 
 	/**
-	 * Reaper runnable collects and closes leaking resources
+	 * Reaper runnable collects and closes leaking resources.
 	 */
 	static final class CloseableReaperThread extends Thread {
 
@@ -187,7 +191,7 @@ public class SafetyNetCloseableRegistry extends
 			try {
 				while (running) {
 					final PhantomDelegatingCloseableRef toClose = (PhantomDelegatingCloseableRef) referenceQueue.remove();
-					
+
 					if (toClose != null) {
 						try {
 							LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index e7f43a4..92b3a74c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -82,6 +82,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	@SuppressWarnings("deprecation")
 	public long getDefaultBlockSize() {
 		return unsafeFileSystem.getDefaultBlockSize();
 	}
@@ -107,6 +108,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	@SuppressWarnings("deprecation")
 	public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
 			throws IOException {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
index 9825781..25dd92d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.core.fs.local;
 
-import java.io.IOException;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.BlockLocation;
 
+import java.io.IOException;
+
 /**
  * Implementation of the {@link BlockLocation} interface for a local file system.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
index 172da79..63017e3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataInputStream;
 
 import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -40,9 +41,9 @@ public class LocalDataInputStream extends FSDataInputStream {
 
 	/**
 	 * Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object.
-	 * 
+	 *
 	 * @param file The File the data stream is read from
-	 * 
+	 *
 	 * @throws IOException Thrown if the data input stream cannot be created.
 	 */
 	public LocalDataInputStream(File file) throws IOException {
@@ -71,18 +72,18 @@ public class LocalDataInputStream extends FSDataInputStream {
 	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
 		return this.fis.read(buffer, offset, length);
 	}
-	
+
 	@Override
 	public void close() throws IOException {
 		// Accoring to javadoc, this also closes the channel
 		this.fis.close();
 	}
-	
+
 	@Override
 	public int available() throws IOException {
 		return this.fis.available();
 	}
-	
+
 	@Override
 	public long skip(final long n) throws IOException {
 		return this.fis.skip(n);

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 5cc011b..aa64593 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.core.fs.local;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataOutputStream;
-
 /**
  * The <code>LocalDataOutputStream</code> class is a wrapper class for a data
  * output stream to the local file system.
@@ -37,7 +37,7 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 
 	/**
 	 * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
-	 * 
+	 *
 	 * @param file
 	 *        the {@link File} object the data stream is read from
 	 * @throws IOException
@@ -62,7 +62,6 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 		fos.close();
 	}
 
-
 	@Override
 	public void flush() throws IOException {
 		fos.flush();

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 63e999d..781e0d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.fs.local;
 
-import java.io.File;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import java.io.File;
+
 /**
  * The class <code>LocalFileStatus</code> provides an implementation of the {@link FileStatus} interface
  * for the local file system.
@@ -45,7 +44,7 @@ public class LocalFileStatus implements FileStatus {
 
 	/**
 	 * Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
-	 * 
+	 *
 	 * @param f
 	 *        the {@link File} object this <code>LocalFileStatus</code> refers to
 	 * @param fs

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index a96f221..c3e5a2f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+/*
+ * Parts of earlier versions of this file were based on source code from the
+ * Hadoop Project (http://hadoop.apache.org/), licensed by the Apache Software Foundation (ASF)
+ * under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
  * additional information regarding copyright ownership.
  */
 
@@ -65,7 +65,7 @@ public class LocalFileSystem extends FileSystem {
 	/** The URI representing the local file system. */
 	private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
 
-	/** The shared instance of the local file system */
+	/** The shared instance of the local file system. */
 	private static final LocalFileSystem INSTANCE = new LocalFileSystem();
 
 	/** Path pointing to the current working directory.
@@ -73,10 +73,10 @@ public class LocalFileSystem extends FileSystem {
 	private final String workingDir;
 
 	/** Path pointing to the current working directory.
-	 * Because Paths are not immutable, we cannot cache the proper path here */
+	 * Because Paths are not immutable, we cannot cache the proper path here. */
 	private final String homeDir;
 
-	/** The host name of this machine */
+	/** The host name of this machine. */
 	private final String hostName;
 
 	/**
@@ -112,7 +112,7 @@ public class LocalFileSystem extends FileSystem {
 		}
 		else {
 			throw new FileNotFoundException("File " + f + " does not exist or the user running "
-					+ "Flink ('"+System.getProperty("user.name")+"') has insufficient permissions to access it.");
+					+ "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
 		}
 	}
 
@@ -149,7 +149,6 @@ public class LocalFileSystem extends FileSystem {
 		return new File(path.toUri().getPath());
 	}
 
-
 	@Override
 	public FileStatus[] listStatus(final Path f) throws IOException {
 
@@ -175,7 +174,6 @@ public class LocalFileSystem extends FileSystem {
 		return results;
 	}
 
-
 	@Override
 	public boolean delete(final Path f, final boolean recursive) throws IOException {
 
@@ -233,7 +231,7 @@ public class LocalFileSystem extends FileSystem {
 	public boolean mkdirs(final Path f) throws IOException {
 		final File p2f = pathToFile(f);
 
-		if(p2f.isDirectory()) {
+		if (p2f.isDirectory()) {
 			return true;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index f9425f3..eb07378 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -32,6 +32,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
+/**
+ * Tests for the {@link AbstractCloseableRegistry}.
+ */
 public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 	protected ProducerThread[] streamOpenThreads;
@@ -140,8 +143,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 		try {
 			closeableRegistry.registerCloseable(testCloseable);
 			Assert.fail("Closed registry should not accept closeables!");
-		}catch (IOException ignore) {
-		}
+		} catch (IOException ignored) {}
 
 		blockCloseLatch.trigger();
 		closer.join();
@@ -151,7 +153,10 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
 	}
 
-	protected static abstract class ProducerThread<C extends Closeable, T> extends Thread {
+	/**
+	 * A testing producer.
+	 */
+	protected abstract static class ProducerThread<C extends Closeable, T> extends Thread {
 
 		protected final AbstractCloseableRegistry<C, T> registry;
 		protected final AtomicInteger refCount;
@@ -188,6 +193,9 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 		}
 	}
 
+	/**
+	 * Testing stream which adds itself to a reference counter while not closed.
+	 */
 	protected static final class TestStream extends FSDataInputStream {
 
 		protected AtomicInteger refCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index c3bf6e6..8a0fb96 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -24,6 +24,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Tests for the {@link CloseableRegistry}.
+ */
 public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 1bde2fb..598b1e1 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.WrappingProxyUtil;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -27,6 +29,9 @@ import java.net.URISyntaxException;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link FileSystem} base class.
+ */
 public class FileSystemTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
index 9b67388..d6de9b6 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
@@ -28,7 +28,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -40,9 +39,12 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.powermock.api.mockito.PowerMockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
 
+/**
+ * A test validating that the initialization of local output paths is properly synchronized.
+ */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(LocalFileSystem.class)
 public class InitOutputPathTest {
@@ -79,7 +81,7 @@ public class InitOutputPathTest {
 
 	@Test
 	public void testProperSynchronized() throws Exception {
-		// in the synchronized variant, we cannot use the "await latches" because not 
+		// in the synchronized variant, we cannot use the "await latches" because not
 		// both threads can make process interleaved (due to the synchronization)
 		// the test uses sleeps (rather than latches) to produce the same interleaving.
 		// while that is not guaranteed to produce the pathological interleaving,
@@ -121,7 +123,7 @@ public class InitOutputPathTest {
 		});
 
 		final LocalFileSystem fs1 = new SyncedFileSystem(
-				deleteAwaitLatch1, mkdirsAwaitLatch1, 
+				deleteAwaitLatch1, mkdirsAwaitLatch1,
 				deleteTriggerLatch1, mkdirsTriggerLatch1);
 
 		final LocalFileSystem fs2 = new SyncedFileSystem(

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
index b133677..2e04648 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
index 509b4ae..7391877 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
@@ -627,7 +627,7 @@ public class LimitedConnectionsFileSystemTest {
 		}
 	}
 
-	private static abstract class BlockingThread extends CheckedThread {
+	private abstract static class BlockingThread extends CheckedThread {
 
 		private final OneShotLatch waiter = new OneShotLatch();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
index 66816ad..b4da2dc 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
@@ -15,13 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.core.fs;
 
+import org.junit.Test;
+
 import java.io.IOException;
 import java.net.URI;
-import org.junit.Test;
-import static org.junit.Assert.*;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link Path} class.
+ */
 public class PathTest {
 
 	@Test
@@ -70,23 +80,23 @@ public class PathTest {
 		assertEquals("/C:/my/windows/path", p.toUri().getPath());
 
 		try {
-			new Path((String)null);
+			new Path((String) null);
 			fail();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			// exception expected
 		}
 
 		try {
 			new Path("");
 			fail();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			// exception expected
 		}
 
 		try {
 			new Path(" ");
 			fail();
-		} catch(Exception e) {
+		} catch (Exception e) {
 			// exception expected
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 4ceda50..5474f99 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -32,6 +32,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Tests for the {@link SafetyNetCloseableRegistry}.
+ */
 public class SafetyNetCloseableRegistryTest
 	extends AbstractCloseableRegistryTest<WrappingProxyCloseable<? extends Closeable>,
 	SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
@@ -44,9 +47,7 @@ public class SafetyNetCloseableRegistryTest
 		return new WrappingProxyCloseable<Closeable>() {
 
 			@Override
-			public void close() throws IOException {
-
-			}
+			public void close() throws IOException {}
 
 			@Override
 			public Closeable getWrappedDelegate() {

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 96c5269..2352404 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -18,25 +18,13 @@
 
 package org.apache.flink.core.fs.local;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.UUID;
-
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.Assume;
@@ -44,6 +32,18 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * This class tests the functionality of the {@link LocalFileSystem} class in its components. In particular,
  * file/directory access, creation, deletion, read, write is tested.

http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/tools/maven/suppressions-core.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-core.xml b/tools/maven/suppressions-core.xml
index 066b30a..78341c9 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -112,14 +112,6 @@ under the License.
 		checks="AvoidStarImport"/>
 
 	<suppress
-		files="(.*)core[/\\]fs[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)core[/\\]fs[/\\](.*)"
-		checks="AvoidStarImport"/>
-
-	<suppress
 		files="(.*)core[/\\]io[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->


[2/2] flink git commit: [FLINK-8198] [core] Fix condition for parsing ConnectionLimitingSettings

Posted by se...@apache.org.
[FLINK-8198] [core] Fix condition for parsing ConnectionLimitingSettings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d4762c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d4762c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d4762c6

Branch: refs/heads/master
Commit: 2d4762c6c0bc73845549575f21fd3f8dbc466aa9
Parents: 23ea197
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 5 12:21:08 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 5 12:21:08 2017 +0100

----------------------------------------------------------------------
 .../core/fs/LimitedConnectionsFileSystem.java   |  2 +-
 .../fs/LimitedConnectionsConfigurationTest.java | 56 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
index 5353563..fdf54e0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
@@ -1074,7 +1074,7 @@ public class LimitedConnectionsFileSystem extends FileSystem {
 			checkLimit(limitOut, limitOutOption);
 
 			// create the settings only, if at least one limit is configured
-			if (totalLimit <= 0 || limitIn <= 0 || limitOut <= 0) {
+			if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
 				// no limit configured
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
index 4742a7e..2c30ce8 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
 import org.apache.flink.testutils.TestFileSystem;
 
 import org.junit.Rule;
@@ -29,6 +31,8 @@ import java.net.URI;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -40,6 +44,10 @@ public class LimitedConnectionsConfigurationTest {
 	@Rule
 	public final TemporaryFolder tempDir = new TemporaryFolder();
 
+	/**
+	 * This test validates that the File System is correctly wrapped by the
+	 * file system factories when the corresponding entries are in the configuration.
+	 */
 	@Test
 	public void testConfiguration() throws Exception {
 		final String fsScheme = TestFileSystem.SCHEME;
@@ -81,4 +89,52 @@ public class LimitedConnectionsConfigurationTest {
 			FileSystem.initialize(new Configuration());
 		}
 	}
+
+	/**
+	 * This test checks that the file system connection limiting configuration object
+	 * is properly created.
+	 */
+	@Test
+	public void testConnectionLimitingSettings() {
+		final String scheme = "testscheme";
+
+		// empty config
+		assertNull(ConnectionLimitingSettings.fromConfig(new Configuration(), scheme));
+
+		// only total limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimit(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(10, settings.limitTotal);
+			assertEquals(0, settings.limitInput);
+			assertEquals(0, settings.limitOutput);
+		}
+
+		// only input limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimitIn(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(0, settings.limitTotal);
+			assertEquals(10, settings.limitInput);
+			assertEquals(0, settings.limitOutput);
+		}
+
+		// only output limit set
+		{
+			Configuration conf = new Configuration();
+			conf.setInteger(CoreOptions.fileSystemConnectionLimitOut(scheme), 10);
+
+			ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+			assertNotNull(settings);
+			assertEquals(0, settings.limitTotal);
+			assertEquals(0, settings.limitInput);
+			assertEquals(10, settings.limitOutput);
+		}
+	}
 }