You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:51:57 UTC

[01/10] flink git commit: [hotfix] Let NetworkBufferPoolTest extend TestLogger and make TestIOException static class

Repository: flink
Updated Branches:
  refs/heads/master a25cd3fed -> 382dd0644


[hotfix] Let NetworkBufferPoolTest extend TestLogger and make TestIOException static class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/382dd064
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/382dd064
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/382dd064

Branch: refs/heads/master
Commit: 382dd0644f17f99bd189cd5a4a1faa1532b913e3
Parents: 390e451
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 3 16:44:28 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/NetworkBufferPoolTest.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/382dd064/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 64c7fad..8d6be0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link NetworkBufferPool}.
  */
-public class NetworkBufferPoolTest {
+public class NetworkBufferPoolTest extends TestLogger {
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
@@ -400,7 +401,7 @@ public class NetworkBufferPoolTest {
 		}
 	}
 
-	private final class TestIOException extends IOException {
+	private static final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}
 
@@ -483,7 +484,6 @@ public class NetworkBufferPoolTest {
 			globalPool.createBufferPool(10, 10);
 		} finally {
 			globalPool.destroy();
-
 		}
 	}
 }


[06/10] flink git commit: [hotfix][network] add a few more checks and tags

Posted by tr...@apache.org.
[hotfix][network] add a few more checks and tags


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3c47961
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3c47961
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3c47961

Branch: refs/heads/master
Commit: b3c47961adaa809b0b60a308f1621c60299b2590
Parents: 918b16a
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 2 12:42:40 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/NetworkBufferPool.java     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3c47961/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7a5832a..d5846ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -27,6 +27,8 @@ import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -37,6 +39,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
@@ -112,6 +115,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				allocatedMb, availableMemorySegments.size(), segmentSize);
 	}
 
+	@Nullable
 	public MemorySegment requestMemorySegment() {
 		return availableMemorySegments.poll();
 	}
@@ -120,7 +124,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		// Adds the segment back to the queue, which does not immediately free the memory
 		// however, since this happens when references to the global pool are also released,
 		// making the availableMemorySegments queue and its contained object reclaimable
-		availableMemorySegments.add(segment);
+		availableMemorySegments.add(checkNotNull(segment));
 	}
 
 	public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {


[10/10] flink git commit: [hotfix][network] always destroy buffer pool in NetworkBufferPoolTest#testRequestMemorySegmentsInterruptable()

Posted by tr...@apache.org.
[hotfix][network] always destroy buffer pool in NetworkBufferPoolTest#testRequestMemorySegmentsInterruptable()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93c7a9b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93c7a9b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93c7a9b2

Branch: refs/heads/master
Commit: 93c7a9b224e8eb498d683fedd2eea2ef31e91a98
Parents: 63730b6
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 3 09:24:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/NetworkBufferPoolTest.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93c7a9b2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index a8ab124..18663f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -390,6 +390,10 @@ public class NetworkBufferPoolTest {
 
 		expectedException.expect(IllegalStateException.class);
 		expectedException.expectMessage("destroyed");
-		asyncRequest.sync();
+		try {
+			asyncRequest.sync();
+		} finally {
+			globalPool.destroy();
+		}
 	}
 }


[08/10] flink git commit: [FLINK-9708][network] fix inconsistency with failed buffer redistribution

Posted by tr...@apache.org.
[FLINK-9708][network] fix inconsistency with failed buffer redistribution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63730b61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63730b61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63730b61

Branch: refs/heads/master
Commit: 63730b61de3342d3ee4c0d0e3c543d55ab966773
Parents: b3c4796
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 2 11:51:09 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/NetworkBufferPool.java    |  8 +++-
 .../network/buffer/NetworkBufferPoolTest.java   | 50 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63730b61/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index d5846ce..a20b25e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -151,7 +151,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;
 
-			redistributeBuffers();
+			try {
+				redistributeBuffers();
+			} catch (Throwable t) {
+				this.numTotalRequiredBuffers -= numRequiredBuffers;
+				ExceptionUtils.rethrowIOException(t);
+			}
 		}
 
 		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
@@ -180,6 +185,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			availableMemorySegments.addAll(segments);
 
+			// note: if this fails, we're fine for the buffer pool since we already recycled the segments
 			redistributeBuffers();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63730b61/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index f1c5d0b..a8ab124 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -309,6 +309,56 @@ public class NetworkBufferPoolTest {
 	}
 
 	/**
+	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with an exception occurring during
+	 * the call to {@link NetworkBufferPool#redistributeBuffers()}.
+	 */
+	@Test
+	public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
+		final int numBuffers = 3;
+
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+		final List<Buffer> buffers = new ArrayList<>(numBuffers);
+		List<MemorySegment> memorySegments = Collections.emptyList();
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+		// make releaseMemory calls always fail:
+		bufferPool.setBufferPoolOwner(numBuffersToRecycle -> {
+			throw new TestIOException();
+		});
+
+		try {
+			// take all but one buffer
+			for (int i = 0; i < numBuffers - 1; ++i) {
+				Buffer buffer = bufferPool.requestBuffer();
+				buffers.add(buffer);
+				assertNotNull(buffer);
+			}
+
+			// this will ask the buffer pool to release its excess buffers which should fail
+			memorySegments = networkBufferPool.requestMemorySegments(2);
+			fail("Requesting memory segments should have thrown during buffer pool redistribution.");
+		} catch (TestIOException e) {
+			// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+			// -> creating a new buffer pool should not fail with "insufficient number of network
+			//    buffers" and instead only with the TestIOException from redistributing buffers in
+			//    bufferPool
+			expectedException.expect(TestIOException.class);
+			networkBufferPool.createBufferPool(2, 2);
+		} finally {
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			bufferPool.lazyDestroy();
+			networkBufferPool.recycleMemorySegments(memorySegments);
+			networkBufferPool.destroy();
+		}
+	}
+
+	private final class TestIOException extends IOException {
+		private static final long serialVersionUID = -814705441998024472L;
+	}
+
+	/**
 	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
 	 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 	 */


[07/10] flink git commit: [FLINK-9633] Use Savepoint path's file system to create checkpoint output stream

Posted by tr...@apache.org.
[FLINK-9633] Use Savepoint path's file system to create checkpoint output stream

This commit changes Flink such that it uses the savepoint path's file system to
generate the output stream instead of the checkpoint path's file system.

This closes #6194.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64bc4b34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64bc4b34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64bc4b34

Branch: refs/heads/master
Commit: 64bc4b348ddd1f0c63e806442ffd3aad5c367a28
Parents: b230bf0
Author: sihuazhou <su...@163.com>
Authored: Thu Jun 21 17:42:54 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../state/filesystem/FsCheckpointStorage.java   |  2 +-
 .../filesystem/FsCheckpointStorageLocation.java |  6 +++
 .../filesystem/FsCheckpointStorageTest.java     | 45 ++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index e148dfb..af80af7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -126,7 +126,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			final Path path = decodePathFromReference(reference);
 
 			return new FsCheckpointStorageLocation(
-					fileSystem,
+					path.getFileSystem(),
 					path,
 					path,
 					path,

http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 469507a..5637b40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
@@ -127,4 +128,9 @@ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory imple
 				", fileStateSizeThreshold=" + fileStateSizeThreshold +
 				'}';
 	}
+
+	@VisibleForTesting
+	FileSystem getFileSystem() {
+		return fileSystem;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
index e391a5d..1a9cf6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
@@ -31,7 +31,10 @@ import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsChe
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Arrays;
@@ -41,6 +44,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link FsCheckpointStorage}, which implements the checkpoint storage
@@ -204,6 +208,28 @@ public class FsCheckpointStorageTest extends AbstractFileCheckpointStorageTestBa
 		assertFalse(checkpointDir.exists());
 	}
 
+	@Test
+	public void testResolveCheckpointStorageLocation() throws Exception {
+		final FileSystem checkpointFileSystem = mock(FileSystem.class);
+		final FsCheckpointStorage storage = new FsCheckpointStorage(
+			new TestingPath("hdfs:///checkpoint/", checkpointFileSystem),
+			null,
+			new JobID(),
+			FILE_SIZE_THRESHOLD);
+
+		final FsCheckpointStorageLocation checkpointStreamFactory =
+			(FsCheckpointStorageLocation) storage.resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
+		assertEquals(checkpointFileSystem, checkpointStreamFactory.getFileSystem());
+
+		final CheckpointStorageLocationReference savepointLocationReference =
+			AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///savepoint/"));
+
+		final FsCheckpointStorageLocation savepointStreamFactory =
+			(FsCheckpointStorageLocation) storage.resolveCheckpointStorageLocation(2L, savepointLocationReference);
+		final FileSystem fileSystem = savepointStreamFactory.getFileSystem();
+		assertTrue(fileSystem instanceof LocalFileSystem);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -212,4 +238,23 @@ public class FsCheckpointStorageTest extends AbstractFileCheckpointStorageTestBa
 		Path path = new Path(parent, child.getName());
 		assertEquals(path, child);
 	}
+
+	private static final class TestingPath extends Path {
+
+		private static final long serialVersionUID = 2560119808844230488L;
+
+		@SuppressWarnings("TransientFieldNotInitialized")
+		@Nonnull
+		private final transient FileSystem fileSystem;
+
+		TestingPath(String pathString, @Nonnull FileSystem fileSystem) {
+			super(pathString);
+			this.fileSystem = fileSystem;
+		}
+
+		@Override
+		public FileSystem getFileSystem() throws IOException {
+			return fileSystem;
+		}
+	}
 }


[09/10] flink git commit: [FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool fails

Posted by tr...@apache.org.
[FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390e451f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390e451f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390e451f

Branch: refs/heads/master
Commit: 390e451f77d874b3255b20e0ea164d6743190aa2
Parents: efc8708
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 3 16:43:23 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      |  7 +++-
 .../io/network/buffer/NetworkBufferPool.java    | 31 ++++++++++----
 .../network/buffer/NetworkBufferPoolTest.java   | 44 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 92a8e94..7d9aa21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -298,7 +299,11 @@ class LocalBufferPool implements BufferPool {
 			}
 		}
 
-		networkBufferPool.destroyBufferPool(this);
+		try {
+			networkBufferPool.destroyBufferPool(this);
+		} catch (IOException e) {
+			ExceptionUtils.rethrow(e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 419f6f3..a369ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -155,6 +155,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				redistributeBuffers();
 			} catch (Throwable t) {
 				this.numTotalRequiredBuffers -= numRequiredBuffers;
+
+				try {
+					redistributeBuffers();
+				} catch (IOException inner) {
+					t.addSuppressed(inner);
+				}
 				ExceptionUtils.rethrowIOException(t);
 			}
 		}
@@ -172,7 +178,11 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				}
 			}
 		} catch (Throwable e) {
-			recycleMemorySegments(segments, numRequiredBuffers);
+			try {
+				recycleMemorySegments(segments, numRequiredBuffers);
+			} catch (IOException inner) {
+				e.addSuppressed(inner);
+			}
 			ExceptionUtils.rethrowIOException(e);
 		}
 
@@ -277,14 +287,23 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			allBufferPools.add(localBufferPool);
 
-			redistributeBuffers();
+			try {
+				redistributeBuffers();
+			} catch (IOException e) {
+				try {
+					destroyBufferPool(localBufferPool);
+				} catch (IOException inner) {
+					e.addSuppressed(inner);
+				}
+				ExceptionUtils.rethrowIOException(e);
+			}
 
 			return localBufferPool;
 		}
 	}
 
 	@Override
-	public void destroyBufferPool(BufferPool bufferPool) {
+	public void destroyBufferPool(BufferPool bufferPool) throws IOException {
 		if (!(bufferPool instanceof LocalBufferPool)) {
 			throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
 		}
@@ -293,11 +312,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			if (allBufferPools.remove(bufferPool)) {
 				numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
 
-				try {
-					redistributeBuffers();
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
+				redistributeBuffers();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 40dc4f3..64c7fad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -356,6 +356,50 @@ public class NetworkBufferPoolTest {
 		}
 	}
 
+	@Test
+	public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
+		final int numBuffers = 3;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+		final List<Buffer> buffers = new ArrayList<>(numBuffers);
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+		bufferPool.setBufferPoolOwner(
+			numBuffersToRecycle -> {
+				throw new TestIOException();
+			});
+
+		try {
+
+			for (int i = 0; i < numBuffers; i++) {
+				Buffer buffer = bufferPool.requestBuffer();
+				buffers.add(buffer);
+				assertNotNull(buffer);
+			}
+
+			try {
+				networkBufferPool.createBufferPool(1, numBuffers);
+				fail("Should have failed because the other buffer pool does not support memory release.");
+			} catch (TestIOException expected) {
+			}
+
+			// destroy the faulty buffer pool
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			buffers.clear();
+			bufferPool.lazyDestroy();
+
+			// now we should be able to create a new buffer pool
+			bufferPool = networkBufferPool.createBufferPool(numBuffers, numBuffers);
+		} finally {
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			bufferPool.lazyDestroy();
+			networkBufferPool.destroy();
+		}
+	}
+
 	private final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}


[02/10] flink git commit: [FLINK-9636][network] fix inconsistency with interrupted buffer polling

Posted by tr...@apache.org.
[FLINK-9636][network] fix inconsistency with interrupted buffer polling

This closes #6238.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc87083
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc87083
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc87083

Branch: refs/heads/master
Commit: efc87083e371eb00e801ef29c65ff49dfb170a4d
Parents: 93c7a9b
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 3 09:26:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/NetworkBufferPool.java    |  8 +++-
 .../network/buffer/NetworkBufferPoolTest.java   | 46 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efc87083/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a20b25e..419f6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -172,7 +172,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				}
 			}
 		} catch (Throwable e) {
-			recycleMemorySegments(segments);
+			recycleMemorySegments(segments, numRequiredBuffers);
 			ExceptionUtils.rethrowIOException(e);
 		}
 
@@ -180,8 +180,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	}
 
 	public void recycleMemorySegments(List<MemorySegment> segments) throws IOException {
+		recycleMemorySegments(segments, segments.size());
+	}
+
+	private void recycleMemorySegments(List<MemorySegment> segments, int size) throws IOException {
 		synchronized (factoryLock) {
-			numTotalRequiredBuffers -= segments.size();
+			numTotalRequiredBuffers -= size;
 
 			availableMemorySegments.addAll(segments);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efc87083/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 18663f7..40dc4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -31,7 +31,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
@@ -396,4 +398,48 @@ public class NetworkBufferPoolTest {
 			globalPool.destroy();
 		}
 	}
+
+	/**
+	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
+	 * remains in a defined state even if the waiting is interrupted.
+	 */
+	@Test
+	public void testRequestMemorySegmentsInterruptable2() throws Exception {
+		final int numBuffers = 10;
+
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		MemorySegment segment = globalPool.requestMemorySegment();
+		assertNotNull(segment);
+
+		final OneShotLatch isRunning = new OneShotLatch();
+		CheckedThread asyncRequest = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				isRunning.trigger();
+				globalPool.requestMemorySegments(10);
+			}
+		};
+		asyncRequest.start();
+
+		// We want the destroy call inside the blocking part of the globalPool.requestMemorySegments()
+		// call above. We cannot guarantee this though but make it highly probable:
+		isRunning.await();
+		Thread.sleep(10);
+		asyncRequest.interrupt();
+
+		globalPool.recycle(segment);
+
+		try {
+			asyncRequest.sync();
+		} catch (IOException e) {
+			assertThat(e, hasProperty("cause", instanceOf(InterruptedException.class)));
+
+			// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+			// -> creating a new buffer pool should not fail
+			globalPool.createBufferPool(10, 10);
+		} finally {
+			globalPool.destroy();
+
+		}
+	}
 }


[05/10] flink git commit: [FLINK-9695] Added mesos.resourcemanager.tasks.container.docker.force-pull-image option

Posted by tr...@apache.org.
[FLINK-9695] Added mesos.resourcemanager.tasks.container.docker.force-pull-image option

This closes #6232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b230bf0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b230bf0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b230bf0e

Branch: refs/heads/master
Commit: b230bf0e3883ee2dba47e22e781009aa9d0f000e
Parents: 229ed77
Author: Leonid Ishimnikov <li...@fastmail.com>
Authored: Mon Jul 2 11:29:57 2018 -0400
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../mesos_task_manager_configuration.html       |  5 ++++
 docs/ops/deployment/mesos.md                    |  2 ++
 .../clusterframework/LaunchableMesosWorker.java |  3 ++-
 .../clusterframework/MesosResourceManager.java  |  1 +
 .../MesosTaskManagerParameters.java             | 21 ++++++++++++++++
 .../MesosFlinkResourceManagerTest.java          |  1 +
 .../MesosResourceManagerTest.java               |  2 +-
 .../MesosTaskManagerParametersTest.java         | 25 ++++++++++++++++++++
 8 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/docs/_includes/generated/mesos_task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html b/docs/_includes/generated/mesos_task_manager_configuration.html
index 2c2ed17..0af844d 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -18,6 +18,11 @@
             <td></td>
         </tr>
         <tr>
+            <td><h5>mesos.resourcemanager.tasks.container.docker.force-pull-image</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Instruct the docker containerizer to forcefully pull the image rather than reuse a cached version.</td>
+        </tr>
+        <tr>
             <td><h5>mesos.resourcemanager.tasks.container.docker.parameters</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of "key=value" pairs. The "value" may contain '='.</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 567d9b7..49ee671 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -270,6 +270,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.uris`: A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers. (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.container.docker.force-pull-image`: Instruct the docker containerizer to forcefully pull the image rather than reuse a cached version. (**DEFAULT**: false)
+
 `mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
 
 `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 0bf09f8..93c90b7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -317,7 +317,8 @@ public class LaunchableMesosWorker implements LaunchableTask {
 					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
 						.addAllParameters(params.dockerParameters())
 						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
-						.setImage(params.containerImageName().get()));
+						.setImage(params.containerImageName().get())
+						.setForcePullImage(params.dockerForcePullImage()));
 				break;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 4d62c04..ea10ff8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -666,6 +666,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 				new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
 			taskManagerParameters.containerVolumes(),
 			taskManagerParameters.dockerParameters(),
+			taskManagerParameters.dockerForcePullImage(),
 			taskManagerParameters.constraints(),
 			taskManagerParameters.command(),
 			taskManagerParameters.bootstrapCommand(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1c19cf2..d915b36 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -107,6 +107,12 @@ public class MesosTaskManagerParameters {
 		.withDescription("Custom parameters to be passed into docker run command when using the docker containerizer." +
 			" Comma separated list of \"key=value\" pairs. The \"value\" may contain '='.");
 
+	public static final ConfigOption<Boolean> MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE =
+		key("mesos.resourcemanager.tasks.container.docker.force-pull-image")
+		.defaultValue(false)
+		.withDescription("Instruct the docker containerizer to forcefully pull the image rather than" +
+			" reuse a cached version.");
+
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
 		.noDefaultValue()
@@ -135,6 +141,8 @@ public class MesosTaskManagerParameters {
 
 	private final List<Protos.Parameter> dockerParameters;
 
+	private final boolean dockerForcePullImage;
+
 	private final List<ConstraintEvaluator> constraints;
 
 	private final String command;
@@ -153,6 +161,7 @@ public class MesosTaskManagerParameters {
 			ContaineredTaskManagerParameters containeredParameters,
 			List<Protos.Volume> containerVolumes,
 			List<Protos.Parameter> dockerParameters,
+			boolean dockerForcePullImage,
 			List<ConstraintEvaluator> constraints,
 			String command,
 			Option<String> bootstrapCommand,
@@ -166,6 +175,7 @@ public class MesosTaskManagerParameters {
 		this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
 		this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
 		this.dockerParameters = Preconditions.checkNotNull(dockerParameters);
+		this.dockerForcePullImage = dockerForcePullImage;
 		this.constraints = Preconditions.checkNotNull(constraints);
 		this.command = Preconditions.checkNotNull(command);
 		this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
@@ -225,6 +235,13 @@ public class MesosTaskManagerParameters {
 	}
 
 	/**
+	 * Get Docker option to force pull image.
+	 */
+	public boolean dockerForcePullImage() {
+		return dockerForcePullImage;
+	}
+
+	/**
 	 * Get the placement constraints.
 	 */
 	public List<ConstraintEvaluator> constraints() {
@@ -269,6 +286,7 @@ public class MesosTaskManagerParameters {
 			", containeredParameters=" + containeredParameters +
 			", containerVolumes=" + containerVolumes +
 			", dockerParameters=" + dockerParameters +
+			", dockerForcePullImage=" + dockerForcePullImage +
 			", constraints=" + constraints +
 			", taskManagerHostName=" + taskManagerHostname +
 			", command=" + command +
@@ -329,6 +347,8 @@ public class MesosTaskManagerParameters {
 
 		Option<String> uriParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_TM_URIS));
 
+		boolean dockerForcePullImage = flinkConfig.getBoolean(MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE);
+
 		List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
 
 		List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt);
@@ -350,6 +370,7 @@ public class MesosTaskManagerParameters {
 			containeredParameters,
 			containerVolumes,
 			dockerParameters,
+			dockerForcePullImage,
 			constraints,
 			tmCommand,
 			tmBootstrapCommand,

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7ee8808..31394b2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -251,6 +251,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 				containeredParams,
 				Collections.<Protos.Volume>emptyList(),
 				Collections.<Protos.Parameter>emptyList(),
+				false,
 				Collections.<ConstraintEvaluator>emptyList(),
 				"",
 				Option.<String>empty(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 65576bb..9fa8c0e 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -279,7 +279,7 @@ public class MesosResourceManagerTest extends TestLogger {
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
 				1.0, 1, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
-				Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(),
+				Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), false,
 				Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
 				Option.<String>empty(), Collections.<String>emptyList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b230bf0e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index e002cb9..ead7a12 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -142,6 +142,31 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 		assertEquals(params.uris().size(), 0);
 	}
 
+	public void testForcePullImageTrue() {
+		Configuration config = new Configuration();
+		config.setBoolean(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE, true);
+
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		assertEquals(params.dockerForcePullImage(), true);
+	}
+
+	@Test
+	public void testForcePullImageFalse() {
+		Configuration config = new Configuration();
+		config.setBoolean(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_FORCE_PULL_IMAGE, false);
+
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		assertEquals(params.dockerForcePullImage(), false);
+	}
+
+	@Test
+	public void testForcePullImageDefault() {
+		Configuration config = new Configuration();
+
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		assertEquals(params.dockerForcePullImage(), false);
+	}
+
 	@Test
 	public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
 


[03/10] flink git commit: [hotfix][network] checkstyle

Posted by tr...@apache.org.
[hotfix][network] checkstyle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918b16a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918b16a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918b16a0

Branch: refs/heads/master
Commit: 918b16a018987822d591b6fd7e33fd937a862e79
Parents: 64bc4b3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 2 11:50:47 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/NetworkBufferPool.java       | 4 ++--
 .../flink/runtime/io/network/buffer/NetworkBufferPoolTest.java   | 3 +++
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/918b16a0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7b817ca..7a5832a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
  * for the network stack.
  *
- * The NetworkBufferPool creates {@link LocalBufferPool}s from which the individual tasks draw
+ * <p>The NetworkBufferPool creates {@link LocalBufferPool}s from which the individual tasks draw
  * the buffers for the network data transfer. When new local buffer pools are created, the
  * NetworkBufferPool dynamically redistributes the buffers between the pools.
  */
@@ -70,7 +70,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	 * Allocates all {@link MemorySegment} instances managed by this pool.
 	 */
 	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
-		
+
 		this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
 		this.memorySegmentSize = segmentSize;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918b16a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 739a4d6..f1c5d0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -42,6 +42,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link NetworkBufferPool}.
+ */
 public class NetworkBufferPoolTest {
 
 	@Rule


[04/10] flink git commit: [FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role

Posted by tr...@apache.org.
[FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role

Config example:

```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <arn>
aws.credentials.provider.role.sessionName: session-name
aws.credentials.provider.role.provider: AUTO
```

[FLINK-9686] [kinesis] Housekeeping: Use early return instead of variable assignment and break

[FLINK-9686] [kinesis] Add dependency on aws-java-sdk-sts

Implicitly (via `Class.forName`) used by `STSProfileCredentialsServiceProvider`.
Due to shading, it is not possible to treat this as a "provided" dependency, as
Maven rewrites the class name with the shaded one, which would force clients to
provide aws-java-sdk-sts shaded in the same way.

[FLINK-9686] [kinesis] Mention new config option in docs

[FLINK-9686] [kinesis] Use `STSAssumeRoleSessionCredentialsProvider` instead

[FLINK-9686] [kinesis] Add constants for new config options

This closes #6221.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/229ed775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/229ed775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/229ed775

Branch: refs/heads/master
Commit: 229ed7755c5bddd9856233e019ff3fa8ddef29a7
Parents: a25cd3f
Author: Franz Thoma <fr...@tngtech.com>
Authored: Mon May 14 15:02:12 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  2 +-
 .../flink-connector-kinesis/pom.xml             |  6 ++
 .../kinesis/config/AWSConfigConstants.java      | 64 ++++++++++++++++--
 .../connectors/kinesis/util/AWSUtil.java        | 68 +++++++++++++-------
 .../kinesis/util/KinesisConfigUtilTest.java     |  7 +-
 5 files changed, 111 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 6a60125..834677d 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -113,7 +113,7 @@ The above is a simple example of using the consumer. Configuration for the consu
 instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
 demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
 the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed
+`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
 from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 690c4f8..62b9539 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -98,6 +98,12 @@ under the License.
 
 		<dependency>
 			<groupId>com.amazonaws</groupId>
+			<artifactId>aws-java-sdk-sts</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.amazonaws</groupId>
 			<artifactId>amazon-kinesis-producer</artifactId>
 			<version>${aws.kinesis-kpl.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index f3ff52b..557e846 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -45,6 +45,9 @@ public class AWSConfigConstants {
 		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
 		BASIC,
 
+		/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
+		ASSUME_ROLE,
+
 		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
 		AUTO,
 	}
@@ -52,22 +55,69 @@ public class AWSConfigConstants {
 	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
 	public static final String AWS_REGION = "aws.region";
 
+	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
+	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
 	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
-	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+	public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
 
 	/** The AWS secret key to use when setting credentials provider type to BASIC. */
-	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
-	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
-	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+	public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
 
 	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
-	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+	public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
 
 	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
-	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+	public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
+
+	/** The role ARN to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
+
+	/** The role session name to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
+
+	/** The external ID to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
+
+	/**
+	 * The credentials provider that provides credentials for assuming the role when credential
+	 * provider type is set to ASSUME_ROLE.
+	 * Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER can again be set to "ASSUME_ROLE"
+	 */
+	public static final String AWS_ROLE_CREDENTIALS_PROVIDER = roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
 
 	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
 	public static final String AWS_ENDPOINT = "aws.endpoint";
 
+	public static String accessKeyId(String prefix) {
+		return prefix + ".basic.accesskeyid";
+	}
+
+	public static String secretKey(String prefix) {
+		return prefix + ".basic.secretkey";
+	}
+
+	public static String profilePath(String prefix) {
+		return prefix + ".profile.path";
+	}
+
+	public static String profileName(String prefix) {
+		return prefix + ".profile.name";
+	}
+
+	public static String roleArn(String prefix) {
+		return prefix + ".role.arn";
+	}
+
+	public static String roleSessionName(String prefix) {
+		return prefix + ".role.sessionName";
+	}
+
+	public static String externalId(String prefix) {
+		return prefix + ".role.externalId";
+	}
+
+	public static String roleCredentialsProvider(String prefix) {
+		return prefix + ".role.provider";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 2678c90..9e5c6cb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -29,6 +29,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
@@ -41,6 +42,8 @@ import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
 import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.DeserializerFactory;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -99,10 +102,24 @@ public class AWSUtil {
 	 * @return The corresponding AWS Credentials Provider instance
 	 */
 	public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+		return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+	}
+
+	/**
+	 * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
+	 * recursively.
+	 *
+	 * @param configProps the configuration properties
+	 * @param configPrefix the prefix of the config properties for this credentials provider,
+	 *                     e.g. aws.credentials.provider for the base credentials provider,
+	 *                     aws.credentials.provider.role.provider for the credentials provider
+	 *                     for assuming a role, and so on.
+	 */
+	private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
 		CredentialProvider credentialProviderType;
-		if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
-			if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
-				&& configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+		if (!configProps.containsKey(configPrefix)) {
+			if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+				&& configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
 				// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
 				credentialProviderType = CredentialProvider.BASIC;
 			} else {
@@ -110,35 +127,32 @@ public class AWSUtil {
 				credentialProviderType = CredentialProvider.AUTO;
 			}
 		} else {
-			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
-				AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(configPrefix));
 		}
 
-		AWSCredentialsProvider credentialsProvider;
-
 		switch (credentialProviderType) {
 			case ENV_VAR:
-				credentialsProvider = new EnvironmentVariableCredentialsProvider();
-				break;
+				return new EnvironmentVariableCredentialsProvider();
+
 			case SYS_PROP:
-				credentialsProvider = new SystemPropertiesCredentialsProvider();
-				break;
+				return new SystemPropertiesCredentialsProvider();
+
 			case PROFILE:
 				String profileName = configProps.getProperty(
-					AWSConfigConstants.AWS_PROFILE_NAME, null);
+						AWSConfigConstants.profileName(configPrefix), null);
 				String profileConfigPath = configProps.getProperty(
-					AWSConfigConstants.AWS_PROFILE_PATH, null);
-				credentialsProvider = (profileConfigPath == null)
+						AWSConfigConstants.profilePath(configPrefix), null);
+				return (profileConfigPath == null)
 					? new ProfileCredentialsProvider(profileName)
 					: new ProfileCredentialsProvider(profileConfigPath, profileName);
-				break;
+
 			case BASIC:
-				credentialsProvider = new AWSCredentialsProvider() {
+				return new AWSCredentialsProvider() {
 					@Override
 					public AWSCredentials getCredentials() {
 						return new BasicAWSCredentials(
-							configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
-							configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+							configProps.getProperty(AWSConfigConstants.accessKeyId(configPrefix)),
+							configProps.getProperty(AWSConfigConstants.secretKey(configPrefix)));
 					}
 
 					@Override
@@ -146,13 +160,23 @@ public class AWSUtil {
 						// do nothing
 					}
 				};
-				break;
+
+			case ASSUME_ROLE:
+				final AWSSecurityTokenService baseCredentials = AWSSecurityTokenServiceClientBuilder.standard()
+						.withCredentials(getCredentialsProvider(configProps, AWSConfigConstants.roleCredentialsProvider(configPrefix)))
+						.withRegion(configProps.getProperty(AWSConfigConstants.AWS_REGION))
+						.build();
+				return new STSAssumeRoleSessionCredentialsProvider.Builder(
+						configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)),
+						configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+						.withExternalId(configProps.getProperty(AWSConfigConstants.externalId(configPrefix)))
+						.withStsClient(baseCredentials)
+						.build();
+
 			default:
 			case AUTO:
-				credentialsProvider = new DefaultAWSCredentialsProviderChain();
+				return new DefaultAWSCredentialsProviderChain();
 		}
-
-		return credentialsProvider;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 7d05783..c4bfa17 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -324,12 +324,7 @@ public class KinesisConfigUtilTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
 
-		try {
-			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test