You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/24 12:02:52 UTC
[2/3] flink git commit: [FLINK-8125] [core] Introduce limiting of
outgoing file system connections
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
new file mode 100644
index 0000000..509b4ae
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link LimitedConnectionsFileSystem}.
+ */
+public class LimitedConnectionsFileSystemTest {
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testConstructionNumericOverflow() {
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ Integer.MAX_VALUE, // limited outgoing
+ Integer.MAX_VALUE, // unlimited incoming
+ Long.MAX_VALUE - 1, // long timeout, close to overflow
+ Long.MAX_VALUE - 1); // long timeout, close to overflow
+
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenStreamsTotal());
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenOutputStreams());
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenInputStreams());
+
+ assertTrue(limitedFs.getStreamOpenTimeout() > 0);
+ assertTrue(limitedFs.getStreamInactivityTimeout() > 0);
+ }
+
+ @Test
+ public void testLimitingOutputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ maxConcurrentOpen, // limited outgoing
+ Integer.MAX_VALUE, // unlimited incoming
+ 0,
+ 0);
+
+ final WriterThread[] threads = new WriterThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new WriterThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ for (WriterThread t : threads) {
+ t.start();
+ }
+
+ for (WriterThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testLimitingInputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ Integer.MAX_VALUE, // unlimited outgoing
+ maxConcurrentOpen, // limited incoming
+ 0,
+ 0);
+
+ final Random rnd = new Random();
+
+ final ReaderThread[] threads = new ReaderThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new ReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ for (ReaderThread t : threads) {
+ t.start();
+ }
+
+ for (ReaderThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testLimitingMixedStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen); // limited total
+
+ final Random rnd = new Random();
+
+ final CheckedThread[] threads = new CheckedThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ File file = tempFolder.newFile();
+ Path path = new Path(file.toURI());
+
+ if (rnd.nextBoolean()) {
+ // reader thread
+ createRandomContents(file, rnd);
+ threads[i] = new ReaderThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ }
+ else {
+ threads[i] = new WriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ }
+ }
+
+ for (CheckedThread t : threads) {
+ t.start();
+ }
+
+ for (CheckedThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testOpenTimeoutOutputStreams() throws Exception {
+ final long openTimeout = 50L;
+ final int maxConcurrentOpen = 2;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ openTimeout, // small opening timeout
+ 0L); // infinite inactivity timeout
+
+ // create the threads that block all streams
+ final BlockingWriterThread[] threads = new BlockingWriterThread[maxConcurrentOpen];
+ for (int i = 0; i < maxConcurrentOpen; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new BlockingWriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ threads[i].start();
+ }
+
+ // wait until all are open
+ while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+ Thread.sleep(1);
+ }
+
+ // try to open another thread
+ try {
+ limitedFs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE);
+ fail("this should have timed out");
+ }
+ catch (IOException e) {
+ // expected
+ }
+
+ // clean shutdown
+ for (BlockingWriterThread t : threads) {
+ t.wakeup();
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testOpenTimeoutInputStreams() throws Exception {
+ final long openTimeout = 50L;
+ final int maxConcurrentOpen = 2;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ openTimeout, // small opening timeout
+ 0L); // infinite inactivity timeout
+
+ // create the threads that block all streams
+ final Random rnd = new Random();
+ final BlockingReaderThread[] threads = new BlockingReaderThread[maxConcurrentOpen];
+ for (int i = 0; i < maxConcurrentOpen; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new BlockingReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ threads[i].start();
+ }
+
+ // wait until all are open
+ while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+ Thread.sleep(1);
+ }
+
+ // try to open another thread
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ try {
+ limitedFs.open(new Path(file.toURI()));
+ fail("this should have timed out");
+ }
+ catch (IOException e) {
+ // expected
+ }
+
+ // clean shutdown
+ for (BlockingReaderThread t : threads) {
+ t.wakeup();
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testTerminateStalledOutputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ // this testing file system has a 50 ms stream inactivity timeout
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // no limit on total streams
+ maxConcurrentOpen, // limit on output streams
+ Integer.MAX_VALUE, // no limit on input streams
+ 0,
+ 50); // timeout of 50 ms
+
+ final WriterThread[] threads = new WriterThread[numThreads];
+ final BlockingWriterThread[] blockers = new BlockingWriterThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ Path path1 = new Path(tempFolder.newFile().toURI());
+ Path path2 = new Path(tempFolder.newFile().toURI());
+
+ threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (WriterThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testTerminateStalledInputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ // this testing file system has a 50 ms stream inactivity timeout
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // no limit on total streams
+ Integer.MAX_VALUE, // limit on output streams
+ maxConcurrentOpen, // no limit on input streams
+ 0,
+ 50); // timeout of 50 ms
+
+ final Random rnd = new Random();
+
+ final ReaderThread[] threads = new ReaderThread[numThreads];
+ final BlockingReaderThread[] blockers = new BlockingReaderThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ File file1 = tempFolder.newFile();
+ File file2 = tempFolder.newFile();
+
+ createRandomContents(file1, rnd);
+ createRandomContents(file2, rnd);
+
+ Path path1 = new Path(file1.toURI());
+ Path path2 = new Path(file2.toURI());
+
+ threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (ReaderThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testTerminateStalledMixedStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ 0L, // no opening timeout
+ 50L); // inactivity timeout of 50 ms
+
+ final Random rnd = new Random();
+
+ final CheckedThread[] threads = new CheckedThread[numThreads];
+ final BlockingThread[] blockers = new BlockingThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ File file1 = tempFolder.newFile();
+ File file2 = tempFolder.newFile();
+ Path path1 = new Path(file1.toURI());
+ Path path2 = new Path(file2.toURI());
+
+ if (rnd.nextBoolean()) {
+ createRandomContents(file1, rnd);
+ createRandomContents(file2, rnd);
+ threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+ else {
+ threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (CheckedThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testFailingStreamsUnregister() throws Exception {
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(new FailFs(), 1);
+
+ assertEquals(0, fs.getNumberOfOpenInputStreams());
+ assertEquals(0, fs.getNumberOfOpenOutputStreams());
+ assertEquals(0, fs.getTotalNumberOfOpenStreams());
+
+ try {
+ fs.open(new Path(tempFolder.newFile().toURI()));
+ fail("this is expected to fail with an exception");
+ } catch (IOException e) {
+ // expected
+ }
+
+ try {
+ fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.NO_OVERWRITE);
+ fail("this is expected to fail with an exception");
+ } catch (IOException e) {
+ // expected
+ }
+
+ assertEquals(0, fs.getNumberOfOpenInputStreams());
+ assertEquals(0, fs.getNumberOfOpenOutputStreams());
+ assertEquals(0, fs.getTotalNumberOfOpenStreams());
+ }
+
+ /**
+ * Tests that a slowly written output stream is not accidentally closed too aggressively, due to
+ * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+ */
+ @Test
+ public void testSlowOutputStreamNotClosed() throws Exception {
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+ // some competing threads
+ final Random rnd = new Random();
+ final ReaderThread[] threads = new ReaderThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new ReaderThread(fs, path, 1, Integer.MAX_VALUE);
+ }
+
+ // open the stream we test
+ try (FSDataOutputStream out = fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE)) {
+
+ // start the other threads that will try to shoot this stream down
+ for (ReaderThread t : threads) {
+ t.start();
+ }
+
+ // read the stream slowly.
+ Thread.sleep(5);
+ for (int bytesLeft = 50; bytesLeft > 0; bytesLeft--) {
+ out.write(bytesLeft);
+ Thread.sleep(5);
+ }
+ }
+
+ // wait for clean shutdown
+ for (ReaderThread t : threads) {
+ t.sync();
+ }
+ }
+
+ /**
+ * Tests that a slowly read stream is not accidentally closed too aggressively, due to
+ * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+ */
+ @Test
+ public void testSlowInputStreamNotClosed() throws Exception {
+ final File file = tempFolder.newFile();
+ createRandomContents(file, new Random(), 50);
+
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+ // some competing threads
+ final WriterThread[] threads = new WriterThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new WriterThread(fs, path, 1, Integer.MAX_VALUE);
+ }
+
+ // open the stream we test
+ try (FSDataInputStream in = fs.open(new Path(file.toURI()))) {
+
+ // start the other threads that will try to shoot this stream down
+ for (WriterThread t : threads) {
+ t.start();
+ }
+
+ // read the stream slowly.
+ Thread.sleep(5);
+ while (in.read() != -1) {
+ Thread.sleep(5);
+ }
+ }
+
+ // wait for clean shutdown
+ for (WriterThread t : threads) {
+ t.sync();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ private void createRandomContents(File file, Random rnd) throws IOException {
+ createRandomContents(file, rnd, rnd.nextInt(10000) + 1);
+ }
+
+ private void createRandomContents(File file, Random rnd, int size) throws IOException {
+ final byte[] data = new byte[size];
+ rnd.nextBytes(data);
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ fos.write(data);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing threads
+ // ------------------------------------------------------------------------
+
+ private static final class WriterThread extends CheckedThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentOutputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ WriterThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentOutputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+ assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final Random rnd = new Random();
+ final byte[] data = new byte[rnd.nextInt(10000) + 1];
+ rnd.nextBytes(data);
+ stream.write(data);
+ }
+ }
+ }
+
+ private static final class ReaderThread extends CheckedThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentInputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ ReaderThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentInputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataInputStream stream = fs.open(path)) {
+ assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final byte[] readBuffer = new byte[4096];
+
+ //noinspection StatementWithEmptyBody
+ while (stream.read(readBuffer) != -1) {}
+ }
+ }
+ }
+
+ private static abstract class BlockingThread extends CheckedThread {
+
+ private final OneShotLatch waiter = new OneShotLatch();
+
+ public void waitTillWokenUp() throws InterruptedException {
+ waiter.await();
+ }
+
+ public void wakeup() {
+ waiter.trigger();
+ }
+ }
+
+ private static final class BlockingWriterThread extends BlockingThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentOutputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ BlockingWriterThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentOutputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+ assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final Random rnd = new Random();
+ final byte[] data = new byte[rnd.nextInt(10000) + 1];
+ rnd.nextBytes(data);
+ stream.write(data);
+
+ waitTillWokenUp();
+
+ // try to write one more thing, which might/should fail with an I/O exception
+ stream.write(rnd.nextInt());
+ }
+ }
+ }
+
+ private static final class BlockingReaderThread extends BlockingThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentInputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ BlockingReaderThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentInputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataInputStream stream = fs.open(path)) {
+ assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final byte[] readBuffer = new byte[(int) fs.getFileStatus(path).getLen() - 1];
+ assertTrue(stream.read(readBuffer) != -1);
+
+ waitTillWokenUp();
+
+ // try to write one more thing, which might/should fail with an I/O exception
+ //noinspection ResultOfMethodCallIgnored
+ stream.read();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // failing file system
+ // ------------------------------------------------------------------------
+
+ private static class FailFs extends LocalFileSystem {
+
+ @Override
+ public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException {
+ throw new IOException("test exception");
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ throw new IOException("test exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
index 50e64e1..2444c65 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
@@ -19,7 +19,10 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
import org.apache.flink.runtime.util.HadoopUtils;
@@ -63,7 +66,7 @@ public class HadoopFsFactory implements FileSystemFactory {
}
@Override
- public HadoopFileSystem create(URI fsUri) throws IOException {
+ public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "fsUri");
final String scheme = fsUri.getScheme();
@@ -162,8 +165,15 @@ public class HadoopFsFactory implements FileSystemFactory {
throw new IOException(message, e);
}
- // all good, return the file system
- return new HadoopFileSystem(hadoopFs);
+ HadoopFileSystem fs = new HadoopFileSystem(hadoopFs);
+
+ // create the Flink file system, optionally limiting the open connections
+ if (flinkConfig != null) {
+ return limitIfConfigured(fs, scheme, flinkConfig);
+ }
+ else {
+ return fs;
+ }
}
catch (ReflectiveOperationException | LinkageError e) {
throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() +
@@ -183,4 +193,23 @@ public class HadoopFsFactory implements FileSystemFactory {
"(like for example HDFS NameNode address/port or S3 host). " +
"The attempt to use a configured default authority failed: ";
}
+
+ private static FileSystem limitIfConfigured(HadoopFileSystem fs, String scheme, Configuration config) {
+ final ConnectionLimitingSettings limitSettings = ConnectionLimitingSettings.fromConfig(config, scheme);
+
+ // decorate only if any limit is configured
+ if (limitSettings == null) {
+ // no limit configured
+ return fs;
+ }
+ else {
+ return new LimitedConnectionsFileSystem(
+ fs,
+ limitSettings.limitTotal,
+ limitSettings.limitOutput,
+ limitSettings.limitInput,
+ limitSettings.streamOpenTimeout,
+ limitSettings.streamInactivityTimeout);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
index 1f5c932..4b7592d 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.fs.hdfs;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -39,7 +40,7 @@ public class HadoopFsFactoryTest extends TestLogger {
final URI uri = URI.create("hdfs://localhost:12345/");
HadoopFsFactory factory = new HadoopFsFactory();
- HadoopFileSystem fs = factory.create(uri);
+ FileSystem fs = factory.create(uri);
assertEquals(uri.getScheme(), fs.getUri().getScheme());
assertEquals(uri.getAuthority(), fs.getUri().getAuthority());
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
new file mode 100644
index 0000000..8ab5419
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test that the Hadoop file system wrapper correctly picks up connection limiting
+ * settings for the correct file systems.
+ */
+public class LimitedConnectionsConfigurationTest {
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ @Test
+ public void testConfiguration() throws Exception {
+
+ // nothing configured, we should get a regular file system
+ FileSystem hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+ FileSystem ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+ assertFalse(hdfs instanceof LimitedConnectionsFileSystem);
+ assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+ // configure some limits, which should cause "fsScheme" to be limited
+
+ final Configuration config = new Configuration();
+ config.setInteger("fs.hdfs.limit.total", 40);
+ config.setInteger("fs.hdfs.limit.input", 39);
+ config.setInteger("fs.hdfs.limit.output", 38);
+ config.setInteger("fs.hdfs.limit.timeout", 23456);
+ config.setInteger("fs.hdfs.limit.stream-timeout", 34567);
+
+ try {
+ FileSystem.initialize(config);
+
+ hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+ ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+ assertTrue(hdfs instanceof LimitedConnectionsFileSystem);
+ assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+ LimitedConnectionsFileSystem limitedFs = (LimitedConnectionsFileSystem) hdfs;
+ assertEquals(40, limitedFs.getMaxNumOpenStreamsTotal());
+ assertEquals(39, limitedFs.getMaxNumOpenInputStreams());
+ assertEquals(38, limitedFs.getMaxNumOpenOutputStreams());
+ assertEquals(23456, limitedFs.getStreamOpenTimeout());
+ assertEquals(34567, limitedFs.getStreamInactivityTimeout());
+ }
+ finally {
+ // clear all settings
+ FileSystem.initialize(new Configuration());
+ }
+ }
+}