You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/12/05 11:32:06 UTC
[1/2] flink git commit: [hotfix] [core] Fix remaining checkstyle
issues for 'core.fs'
Repository: flink
Updated Branches:
refs/heads/master b051a4c88 -> 2d4762c6c
[hotfix] [core] Fix remaining checkstyle issues for 'core.fs'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23ea197b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23ea197b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23ea197b
Branch: refs/heads/master
Commit: 23ea197b751a98d10a1bd549175f2566f3a7c227
Parents: b051a4c
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 24 13:55:51 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 4 18:56:30 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 2 ++
.../flink/core/fs/FileSystemSafetyNet.java | 20 +++++++--------
.../core/fs/SafetyNetCloseableRegistry.java | 26 +++++++++++---------
.../core/fs/SafetyNetWrapperFileSystem.java | 2 ++
.../flink/core/fs/local/LocalBlockLocation.java | 4 +--
.../core/fs/local/LocalDataInputStream.java | 11 +++++----
.../core/fs/local/LocalDataOutputStream.java | 9 +++----
.../flink/core/fs/local/LocalFileStatus.java | 7 +++---
.../flink/core/fs/local/LocalFileSystem.java | 20 +++++++--------
.../core/fs/AbstractCloseableRegistryTest.java | 14 ++++++++---
.../flink/core/fs/CloseableRegistryTest.java | 3 +++
.../apache/flink/core/fs/FileSystemTest.java | 5 ++++
.../flink/core/fs/InitOutputPathTest.java | 12 +++++----
...itedConnectionsFileSystemDelegationTest.java | 1 -
.../fs/LimitedConnectionsFileSystemTest.java | 2 +-
.../java/org/apache/flink/core/fs/PathTest.java | 22 ++++++++++++-----
.../core/fs/SafetyNetCloseableRegistryTest.java | 7 +++---
.../core/fs/local/LocalFileSystemTest.java | 26 ++++++++++----------
tools/maven/suppressions-core.xml | 8 ------
19 files changed, 113 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 18baaa5..07a1e76 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -229,7 +229,9 @@ public abstract class FileSystem {
/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
+ //CHECKSTYLE.OFF: StaticVariableName
private static URI DEFAULT_SCHEME;
+ //CHECKSTYLE.ON: StaticVariableName
// ------------------------------------------------------------------------
// Initialization
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index c06ccac..d72aec0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -30,22 +30,22 @@ import static org.apache.flink.util.Preconditions.checkState;
* When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
* obtains. The safety net has a global cleanup hook that will close all streams that were
* not properly closed.
- *
+ *
* <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
* by this safety net.
- *
+ *
* <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
* i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
* {@link Path#getFileSystem()}.
- *
+ *
* <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
* to another thread, the safety net will close those resources once the former thread finishes.
- *
+ *
* <p>The safety net can be used as follows:
* <pre>{@code
- *
+ *
* class GuardedThread extends Thread {
- *
+ *
* public void run() {
* FileSystemSafetyNet.initializeSafetyNetForThread();
* try {
@@ -62,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public class FileSystemSafetyNet {
- /** The map from thread to the safety net registry for that thread */
+ /** The map from thread to the safety net registry for that thread. */
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
// ------------------------------------------------------------------------
@@ -73,9 +73,9 @@ public class FileSystemSafetyNet {
* Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
* that called this method will be guarded, meaning that their created streams are tracked and can
* be closed via the safety net closing hook.
- *
+ *
* <p>This method should be called at the beginning of a thread that should be guarded.
- *
+ *
* @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
*/
@Internal
@@ -94,7 +94,7 @@ public class FileSystemSafetyNet {
* Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
* by safety-net-guarded file systems. After this method was called, no streams can be opened any more
* from any FileSystem instance that was obtained while the thread was guarded by the safety net.
- *
+ *
* <p>This method should be called at the very end of a guarded thread.
*/
@Internal
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 9c4272f..ccf944e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -39,13 +39,13 @@ import java.util.Map;
/**
* This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
* the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
- * <p>
- * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ *
+ * <p>Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
* GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
- * <p>
- * Other than that, it works like a normal {@link CloseableRegistry}.
- * <p>
- * All methods in this class are thread-safe.
+ *
+ * <p>Other than that, it works like a normal {@link CloseableRegistry}.
+ *
+ * <p>All methods in this class are thread-safe.
*/
@Internal
public class SafetyNetCloseableRegistry extends
@@ -54,15 +54,19 @@ public class SafetyNetCloseableRegistry extends
private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
- /** Lock for atomic modifications to reaper thread and registry count */
+ /** Lock for atomic modifications to reaper thread and registry count. */
private static final Object REAPER_THREAD_LOCK = new Object();
- /** Singleton reaper thread takes care of all registries in VM */
+ //CHECKSTYLE.OFF: StaticVariableName
+
+ /** Singleton reaper thread takes care of all registries in VM. */
private static CloseableReaperThread REAPER_THREAD = null;
- /** Global count of all instances of SafetyNetCloseableRegistry */
+ /** Global count of all instances of SafetyNetCloseableRegistry. */
private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
+ //CHECKSTYLE.ON: StaticVariableName
+
SafetyNetCloseableRegistry() {
super(new IdentityHashMap<>());
@@ -166,7 +170,7 @@ public class SafetyNetCloseableRegistry extends
}
/**
- * Reaper runnable collects and closes leaking resources
+ * Reaper runnable collects and closes leaking resources.
*/
static final class CloseableReaperThread extends Thread {
@@ -187,7 +191,7 @@ public class SafetyNetCloseableRegistry extends
try {
while (running) {
final PhantomDelegatingCloseableRef toClose = (PhantomDelegatingCloseableRef) referenceQueue.remove();
-
+
if (toClose != null) {
try {
LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index e7f43a4..92b3a74c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -82,6 +82,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
+ @SuppressWarnings("deprecation")
public long getDefaultBlockSize() {
return unsafeFileSystem.getDefaultBlockSize();
}
@@ -107,6 +108,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
+ @SuppressWarnings("deprecation")
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
index 9825781..25dd92d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
@@ -18,11 +18,11 @@
package org.apache.flink.core.fs.local;
-import java.io.IOException;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.BlockLocation;
+import java.io.IOException;
+
/**
* Implementation of the {@link BlockLocation} interface for a local file system.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
index 172da79..63017e3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import javax.annotation.Nonnull;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -40,9 +41,9 @@ public class LocalDataInputStream extends FSDataInputStream {
/**
* Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object.
- *
+ *
* @param file The File the data stream is read from
- *
+ *
* @throws IOException Thrown if the data input stream cannot be created.
*/
public LocalDataInputStream(File file) throws IOException {
@@ -71,18 +72,18 @@ public class LocalDataInputStream extends FSDataInputStream {
public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
return this.fis.read(buffer, offset, length);
}
-
+
@Override
public void close() throws IOException {
// Accoring to javadoc, this also closes the channel
this.fis.close();
}
-
+
@Override
public int available() throws IOException {
return this.fis.available();
}
-
+
@Override
public long skip(final long n) throws IOException {
return this.fis.skip(n);
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 5cc011b..aa64593 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -18,13 +18,13 @@
package org.apache.flink.core.fs.local;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataOutputStream;
-
/**
* The <code>LocalDataOutputStream</code> class is a wrapper class for a data
* output stream to the local file system.
@@ -37,7 +37,7 @@ public class LocalDataOutputStream extends FSDataOutputStream {
/**
* Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
- *
+ *
* @param file
* the {@link File} object the data stream is read from
* @throws IOException
@@ -62,7 +62,6 @@ public class LocalDataOutputStream extends FSDataOutputStream {
fos.close();
}
-
@Override
public void flush() throws IOException {
fos.flush();
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 63e999d..781e0d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.core.fs.local;
-import java.io.File;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import java.io.File;
+
/**
* The class <code>LocalFileStatus</code> provides an implementation of the {@link FileStatus} interface
* for the local file system.
@@ -45,7 +44,7 @@ public class LocalFileStatus implements FileStatus {
/**
* Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
- *
+ *
* @param f
* the {@link File} object this <code>LocalFileStatus</code> refers to
* @param fs
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index a96f221..c3e5a2f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+/*
+ * Parts of earlier versions of this file were based on source code from the
+ * Hadoop Project (http://hadoop.apache.org/), licensed by the Apache Software Foundation (ASF)
+ * under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
@@ -65,7 +65,7 @@ public class LocalFileSystem extends FileSystem {
/** The URI representing the local file system. */
private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
- /** The shared instance of the local file system */
+ /** The shared instance of the local file system. */
private static final LocalFileSystem INSTANCE = new LocalFileSystem();
/** Path pointing to the current working directory.
@@ -73,10 +73,10 @@ public class LocalFileSystem extends FileSystem {
private final String workingDir;
/** Path pointing to the current working directory.
- * Because Paths are not immutable, we cannot cache the proper path here */
+ * Because Paths are not immutable, we cannot cache the proper path here. */
private final String homeDir;
- /** The host name of this machine */
+ /** The host name of this machine. */
private final String hostName;
/**
@@ -112,7 +112,7 @@ public class LocalFileSystem extends FileSystem {
}
else {
throw new FileNotFoundException("File " + f + " does not exist or the user running "
- + "Flink ('"+System.getProperty("user.name")+"') has insufficient permissions to access it.");
+ + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
}
}
@@ -149,7 +149,6 @@ public class LocalFileSystem extends FileSystem {
return new File(path.toUri().getPath());
}
-
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
@@ -175,7 +174,6 @@ public class LocalFileSystem extends FileSystem {
return results;
}
-
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
@@ -233,7 +231,7 @@ public class LocalFileSystem extends FileSystem {
public boolean mkdirs(final Path f) throws IOException {
final File p2f = pathToFile(f);
- if(p2f.isDirectory()) {
+ if (p2f.isDirectory()) {
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index f9425f3..eb07378 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -32,6 +32,9 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
+/**
+ * Tests for the {@link AbstractCloseableRegistry}.
+ */
public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
protected ProducerThread[] streamOpenThreads;
@@ -140,8 +143,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
try {
closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
- }catch (IOException ignore) {
- }
+ } catch (IOException ignored) {}
blockCloseLatch.trigger();
closer.join();
@@ -151,7 +153,10 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
}
- protected static abstract class ProducerThread<C extends Closeable, T> extends Thread {
+ /**
+ * A testing producer.
+ */
+ protected abstract static class ProducerThread<C extends Closeable, T> extends Thread {
protected final AbstractCloseableRegistry<C, T> registry;
protected final AtomicInteger refCount;
@@ -188,6 +193,9 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
}
}
+ /**
+ * Testing stream which adds itself to a reference counter while not closed.
+ */
protected static final class TestStream extends FSDataInputStream {
protected AtomicInteger refCount;
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index c3bf6e6..8a0fb96 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -24,6 +24,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Tests for the {@link CloseableRegistry}.
+ */
public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 1bde2fb..598b1e1 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.core.fs;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.WrappingProxyUtil;
+
import org.junit.Test;
import java.io.IOException;
@@ -27,6 +29,9 @@ import java.net.URISyntaxException;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link FileSystem} base class.
+ */
public class FileSystemTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
index 9b67388..d6de9b6 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
@@ -28,7 +28,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -40,9 +39,12 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.locks.ReentrantLock;
-import static org.powermock.api.mockito.PowerMockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+/**
+ * A test validating that the initialization of local output paths is properly synchronized.
+ */
@RunWith(PowerMockRunner.class)
@PrepareForTest(LocalFileSystem.class)
public class InitOutputPathTest {
@@ -79,7 +81,7 @@ public class InitOutputPathTest {
@Test
public void testProperSynchronized() throws Exception {
- // in the synchronized variant, we cannot use the "await latches" because not
+ // in the synchronized variant, we cannot use the "await latches" because not
// both threads can make process interleaved (due to the synchronization)
// the test uses sleeps (rather than latches) to produce the same interleaving.
// while that is not guaranteed to produce the pathological interleaving,
@@ -121,7 +123,7 @@ public class InitOutputPathTest {
});
final LocalFileSystem fs1 = new SyncedFileSystem(
- deleteAwaitLatch1, mkdirsAwaitLatch1,
+ deleteAwaitLatch1, mkdirsAwaitLatch1,
deleteTriggerLatch1, mkdirsTriggerLatch1);
final LocalFileSystem fs2 = new SyncedFileSystem(
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
index b133677..2e04648 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
@@ -28,7 +28,6 @@ import java.io.IOException;
import java.util.Random;
import static org.junit.Assert.assertEquals;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
index 509b4ae..7391877 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
@@ -627,7 +627,7 @@ public class LimitedConnectionsFileSystemTest {
}
}
- private static abstract class BlockingThread extends CheckedThread {
+ private abstract static class BlockingThread extends CheckedThread {
private final OneShotLatch waiter = new OneShotLatch();
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
index 66816ad..b4da2dc 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
@@ -15,13 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.core.fs;
+import org.junit.Test;
+
import java.io.IOException;
import java.net.URI;
-import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link Path} class.
+ */
public class PathTest {
@Test
@@ -70,23 +80,23 @@ public class PathTest {
assertEquals("/C:/my/windows/path", p.toUri().getPath());
try {
- new Path((String)null);
+ new Path((String) null);
fail();
- } catch(Exception e) {
+ } catch (Exception e) {
// exception expected
}
try {
new Path("");
fail();
- } catch(Exception e) {
+ } catch (Exception e) {
// exception expected
}
try {
new Path(" ");
fail();
- } catch(Exception e) {
+ } catch (Exception e) {
// exception expected
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 4ceda50..5474f99 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -32,6 +32,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Tests for the {@link SafetyNetCloseableRegistry}.
+ */
public class SafetyNetCloseableRegistryTest
extends AbstractCloseableRegistryTest<WrappingProxyCloseable<? extends Closeable>,
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
@@ -44,9 +47,7 @@ public class SafetyNetCloseableRegistryTest
return new WrappingProxyCloseable<Closeable>() {
@Override
- public void close() throws IOException {
-
- }
+ public void close() throws IOException {}
@Override
public Closeable getWrappedDelegate() {
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 96c5269..2352404 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -18,25 +18,13 @@
package org.apache.flink.core.fs.local;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.UUID;
-
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.FileUtils;
import org.junit.Assume;
@@ -44,6 +32,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* This class tests the functionality of the {@link LocalFileSystem} class in its components. In particular,
* file/directory access, creation, deletion, read, write is tested.
http://git-wip-us.apache.org/repos/asf/flink/blob/23ea197b/tools/maven/suppressions-core.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-core.xml b/tools/maven/suppressions-core.xml
index 066b30a..78341c9 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -112,14 +112,6 @@ under the License.
checks="AvoidStarImport"/>
<suppress
- files="(.*)core[/\\]fs[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
- <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
- <suppress
- files="(.*)test[/\\](.*)core[/\\]fs[/\\](.*)"
- checks="AvoidStarImport"/>
-
- <suppress
files="(.*)core[/\\]io[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
[2/2] flink git commit: [FLINK-8198] [core] Fix condition for parsing
ConnectionLimitingSettings
Posted by se...@apache.org.
[FLINK-8198] [core] Fix condition for parsing ConnectionLimitingSettings
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d4762c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d4762c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d4762c6
Branch: refs/heads/master
Commit: 2d4762c6c0bc73845549575f21fd3f8dbc466aa9
Parents: 23ea197
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 5 12:21:08 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 5 12:21:08 2017 +0100
----------------------------------------------------------------------
.../core/fs/LimitedConnectionsFileSystem.java | 2 +-
.../fs/LimitedConnectionsConfigurationTest.java | 56 ++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
index 5353563..fdf54e0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
@@ -1074,7 +1074,7 @@ public class LimitedConnectionsFileSystem extends FileSystem {
checkLimit(limitOut, limitOutOption);
// create the settings only, if at least one limit is configured
- if (totalLimit <= 0 || limitIn <= 0 || limitOut <= 0) {
+ if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
// no limit configured
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d4762c6/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
index 4742a7e..2c30ce8 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.core.fs;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
import org.apache.flink.testutils.TestFileSystem;
import org.junit.Rule;
@@ -29,6 +31,8 @@ import java.net.URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -40,6 +44,10 @@ public class LimitedConnectionsConfigurationTest {
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
+ /**
+ * This test validates that the File System is correctly wrapped by the
+ * file system factories when the corresponding entries are in the configuration.
+ */
@Test
public void testConfiguration() throws Exception {
final String fsScheme = TestFileSystem.SCHEME;
@@ -81,4 +89,52 @@ public class LimitedConnectionsConfigurationTest {
FileSystem.initialize(new Configuration());
}
}
+
+ /**
+ * This test checks that the file system connection limiting configuration object
+ * is properly created.
+ */
+ @Test
+ public void testConnectionLimitingSettings() {
+ final String scheme = "testscheme";
+
+ // empty config
+ assertNull(ConnectionLimitingSettings.fromConfig(new Configuration(), scheme));
+
+ // only total limit set
+ {
+ Configuration conf = new Configuration();
+ conf.setInteger(CoreOptions.fileSystemConnectionLimit(scheme), 10);
+
+ ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+ assertNotNull(settings);
+ assertEquals(10, settings.limitTotal);
+ assertEquals(0, settings.limitInput);
+ assertEquals(0, settings.limitOutput);
+ }
+
+ // only input limit set
+ {
+ Configuration conf = new Configuration();
+ conf.setInteger(CoreOptions.fileSystemConnectionLimitIn(scheme), 10);
+
+ ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+ assertNotNull(settings);
+ assertEquals(0, settings.limitTotal);
+ assertEquals(10, settings.limitInput);
+ assertEquals(0, settings.limitOutput);
+ }
+
+ // only output limit set
+ {
+ Configuration conf = new Configuration();
+ conf.setInteger(CoreOptions.fileSystemConnectionLimitOut(scheme), 10);
+
+ ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(conf, scheme);
+ assertNotNull(settings);
+ assertEquals(0, settings.limitTotal);
+ assertEquals(0, settings.limitInput);
+ assertEquals(10, settings.limitOutput);
+ }
+ }
}