You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:58 UTC

[flink] 10/11: [FLINK-19245][connectors] Set default capacity for FutureCompletingBlockingQueue.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cef8a587d7fd2fe64cc644da5ed095d82e46f631
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 16:05:37 2020 +0200

    [FLINK-19245][connectors] Set default capacity for FutureCompletingBlockingQueue.
---
 .../apache/flink/connector/base/source/reader/SourceReaderOptions.java  | 2 +-
 .../source/reader/synchronization/FutureCompletingBlockingQueue.java    | 2 +-
 .../reader/synchronization/FutureCompletingBlockingQueueTest.java       | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
index 508b347..dae1a40 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
@@ -38,7 +38,7 @@ public class SourceReaderOptions {
 		ConfigOptions
 				.key("source.reader.element.queue.capacity")
 				.intType()
-				.defaultValue(1)
+				.defaultValue(2)
 				.withDescription("The capacity of the element queue in the source reader.");
 
 	// --------------- final fields ----------------------
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index c89b682..1fe1985 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -80,7 +80,7 @@ public class FutureCompletingBlockingQueue<T> {
 	/**
 	 * The default capacity for the queue.
 	 */
-	private static final int DEFAULT_CAPACITY = 1;
+	private static final int DEFAULT_CAPACITY = 2;
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ef056d9e..2a191d2 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.fail;
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
-	private static final Integer DEFAULT_CAPACITY = 1;
+	private static final Integer DEFAULT_CAPACITY = 2;
 	private static final Integer SPECIFIED_CAPACITY = 20000;
 
 	@Test