You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/07/06 07:22:36 UTC
[flink] branch master updated: [FLINK-17761][connector/common] Add
a constructor taking capacity as a parameter for
FutureCompletingBlockingQueue
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 245632f [FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue
245632f is described below
commit 245632f79f9d8df10b8ebb03e1ffdb6d400292fa
Author: fangliang <56...@qq.com>
AuthorDate: Mon Jul 6 15:21:22 2020 +0800
[FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue
This closes #12566
---
.../FutureCompletingBlockingQueue.java | 10 +++++
.../FutureCompletingBlockingQueueTest.java | 44 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index 6a6dfac..de51af1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -36,9 +36,19 @@ import java.util.concurrent.TimeUnit;
* @param <T> the type of the elements in the queue.
*/
public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
+
private final FutureNotifier futureNotifier;
+ /**
+ * The default capacity for {@link LinkedBlockingQueue}.
+ */
+ private static final Integer DEFAULT_CAPACITY = 10000;
public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
+ this(futureNotifier, DEFAULT_CAPACITY);
+ }
+
+ public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
+ super(capacity);
this.futureNotifier = futureNotifier;
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
new file mode 100644
index 0000000..ad74f2a
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.synchronization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The unit test for {@link FutureCompletingBlockingQueue}.
+ */
+public class FutureCompletingBlockingQueueTest {
+
+
+ private static final Integer DEFAULT_CAPACITY = 10000;
+ private static final Integer SPECIFIED_CAPACITY = 20000;
+
+ @Test
+ public void testFutureCompletingBlockingQueueConstructor() {
+ FutureNotifier notifier = new FutureNotifier();
+ FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier);
+ FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+ // The capacity of the queue needs to be equal to 10000
+ assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
+ // The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
+ assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
+ }
+}