You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/07/06 07:22:36 UTC

[flink] branch master updated: [FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 245632f  [FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue
245632f is described below

commit 245632f79f9d8df10b8ebb03e1ffdb6d400292fa
Author: fangliang <56...@qq.com>
AuthorDate: Mon Jul 6 15:21:22 2020 +0800

    [FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue
    
    This closes #12566
---
 .../FutureCompletingBlockingQueue.java             | 10 +++++
 .../FutureCompletingBlockingQueueTest.java         | 44 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index 6a6dfac..de51af1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -36,9 +36,19 @@ import java.util.concurrent.TimeUnit;
  * @param <T> the type of the elements in the queue.
  */
 public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
+
 	private final FutureNotifier futureNotifier;
+	/**
+	 * The default capacity for {@link LinkedBlockingQueue}.
+	 */
+	private static final Integer DEFAULT_CAPACITY = 10000;
 
 	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
+		this(futureNotifier, DEFAULT_CAPACITY);
+	}
+
+	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
+		super(capacity);
 		this.futureNotifier = futureNotifier;
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
new file mode 100644
index 0000000..ad74f2a
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.synchronization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The unit test for {@link FutureCompletingBlockingQueue}.
+ */
+public class FutureCompletingBlockingQueueTest {
+
+
+	private static final Integer DEFAULT_CAPACITY = 10000;
+	private static final Integer SPECIFIED_CAPACITY = 20000;
+
+	@Test
+	public void testFutureCompletingBlockingQueueConstructor() {
+		FutureNotifier notifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier);
+		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+		// The capacity of the queue needs to be equal to 10000
+		assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
+		// The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
+		assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
+	}
+}