You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/10 07:15:15 UTC

[flink] branch master updated (c3eb3a3 -> 294fcb2)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c3eb3a3  [hotfix][python] Fix the documentation issue
     new 4ed73a5  [hotfix][test] Clean up NettyShuffleEnvironmentTest
     new 294fcb2  [hotfix][checkpointing] Extract MINIMAL_CHECKPOINT_TIME magic constant

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tasks/CheckpointCoordinatorConfiguration.java  |  4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 60 +++++++++++-----------
 .../api/environment/CheckpointConfig.java          |  9 ++--
 .../api/graph/StreamingJobGraphGenerator.java      |  4 +-
 .../test/streaming/runtime/IterateITCase.java      | 11 +++-
 5 files changed, 49 insertions(+), 39 deletions(-)


[flink] 02/02: [hotfix][checkpointing] Extract MINIMAL_CHECKPOINT_TIME magic constant

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 294fcb243fd97e78184419ce6e536abf0e060a35
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 9 15:17:03 2019 +0200

    [hotfix][checkpointing] Extract MINIMAL_CHECKPOINT_TIME magic constant
---
 .../jobgraph/tasks/CheckpointCoordinatorConfiguration.java    |  4 +++-
 .../flink/streaming/api/environment/CheckpointConfig.java     |  9 +++++----
 .../flink/streaming/api/graph/StreamingJobGraphGenerator.java |  4 +++-
 .../apache/flink/test/streaming/runtime/IterateITCase.java    | 11 +++++++++--
 4 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 74fcdf3..36fbe3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -32,6 +32,8 @@ import java.util.Objects;
  */
 public class CheckpointCoordinatorConfiguration implements Serializable {
 
+	public static final long MINIMAL_CHECKPOINT_TIME = 10;
+
 	private static final long serialVersionUID = 2L;
 
 	private final long checkpointInterval;
@@ -69,7 +71,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 			int tolerableCpFailureNumber) {
 
 		// sanity checks
-		if (checkpointInterval < 10 || checkpointTimeout < 10 ||
+		if (checkpointInterval < MINIMAL_CHECKPOINT_TIME || checkpointTimeout < MINIMAL_CHECKPOINT_TIME ||
 			minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 ||
 			tolerableCpFailureNumber < 0) {
 			throw new IllegalArgumentException();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 033f55a..7a99a54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
+import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -146,8 +147,8 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * @param checkpointInterval The checkpoint interval, in milliseconds.
 	 */
 	public void setCheckpointInterval(long checkpointInterval) {
-		if (checkpointInterval < 10) {
-			throw new IllegalArgumentException("Checkpoint interval must be larger than or equal to 10ms");
+		if (checkpointInterval < MINIMAL_CHECKPOINT_TIME) {
+			throw new IllegalArgumentException(String.format("Checkpoint interval must be larger than or equal to %s ms", MINIMAL_CHECKPOINT_TIME));
 		}
 		this.checkpointInterval = checkpointInterval;
 	}
@@ -167,8 +168,8 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * @param checkpointTimeout The checkpoint timeout, in milliseconds.
 	 */
 	public void setCheckpointTimeout(long checkpointTimeout) {
-		if (checkpointTimeout < 10) {
-			throw new IllegalArgumentException("Checkpoint timeout must be larger than or equal to 10ms");
+		if (checkpointTimeout < MINIMAL_CHECKPOINT_TIME) {
+			throw new IllegalArgumentException(String.format("Checkpoint timeout must be larger than or equal to %s ms", MINIMAL_CHECKPOINT_TIME));
 		}
 		this.checkpointTimeout = checkpointTimeout;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8306da6..a0cb255 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -74,6 +74,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
+
 /**
  * The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}.
  */
@@ -609,7 +611,7 @@ public class StreamingJobGraphGenerator {
 		CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
 		long interval = cfg.getCheckpointInterval();
-		if (interval < 10) {
+		if (interval < MINIMAL_CHECKPOINT_TIME) {
 			// interval of max value means disable periodic checkpoint
 			interval = Long.MAX_VALUE;
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index bd9cd75..168f11d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -609,7 +610,10 @@ public class IterateITCase extends AbstractTestBase {
 				// Test force checkpointing
 
 				try {
-					env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, false);
+					env.enableCheckpointing(
+						CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
+						CheckpointingMode.EXACTLY_ONCE,
+						false);
 					env.execute();
 
 					// this statement should never be reached
@@ -618,7 +622,10 @@ public class IterateITCase extends AbstractTestBase {
 					// expected behaviour
 				}
 
-				env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, true);
+				env.enableCheckpointing(
+					CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
+					CheckpointingMode.EXACTLY_ONCE,
+					true);
 				env.getStreamGraph().getJobGraph();
 
 				break; // success


[flink] 01/02: [hotfix][test] Clean up NettyShuffleEnvironmentTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ed73a51174353dfff98e485798c5fc8dd9bd20b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 9 15:10:07 2019 +0200

    [hotfix][test] Clean up NettyShuffleEnvironmentTest
---
 .../io/network/NettyShuffleEnvironmentTest.java    | 60 +++++++++++-----------
 1 file changed, 29 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ba70a47..51f1c04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -85,15 +85,6 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 	/**
 	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
-	 * instances for various types of input and output channels.
-	 */
-	@Test
-	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-		testRegisterTaskWithLimitedBuffers(NettyShuffleEnvironmentBuilder.DEFAULT_NUM_NETWORK_BUFFERS, false);
-	}
-
-	/**
-	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels working with the bare minimum of
 	 * required buffers.
 	 */
@@ -109,7 +100,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 			bufferCount = 10 + 10 * NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
 		}
 
-		testRegisterTaskWithLimitedBuffers(bufferCount, true);
+		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
 	/**
@@ -130,10 +121,10 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		expectedException.expect(IOException.class);
 		expectedException.expectMessage("Insufficient number of network buffers");
-		testRegisterTaskWithLimitedBuffers(bufferCount, true);
+		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
-	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize, boolean assertNumBuffers) throws Exception {
+	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
 			.setIsCreditBased(enableCreditBasedFlowControl)
@@ -141,18 +132,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
+		int channels = 2;
+		int rp4Channels = 4;
+		int floatingBuffers = network.getConfiguration().floatingNetworkBuffersPerGate();
+		int exclusiveBuffers = network.getConfiguration().networkBuffersPerChannel();
+
+		int expectedBuffers = channels * exclusiveBuffers + floatingBuffers;
+		int expectedRp4Buffers = rp4Channels * exclusiveBuffers + floatingBuffers;
+
 		// result partitions
-		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
-		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, channels);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, channels);
+		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
+
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
-		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, channels);
+		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, channels);
+		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
 		createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
@@ -174,27 +174,25 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		// verify buffer pools for the result partitions
 		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(4 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(expectedBuffers, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(expectedRp4Buffers, rp4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		for (ResultPartition rp : resultPartitions) {
 			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
-			if (assertNumBuffers) {
-				assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
-			}
+			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
 		}
 
 		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
 		// for exclusive buffers not managed by the buffer pool)
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 4, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : rp4Channels, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
 
 		assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedBuffers, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedRp4Buffers, ig4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		int invokations = enableCreditBasedFlowControl ? 1 : 0;
 		verify(ig1, times(invokations)).assignExclusiveSegments();