You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/12/13 09:23:46 UTC
[nifi] branch main updated: NIFI-8085 Use poll(Duration) in
ConsumeKafka_2_x processors
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 91f6b42 NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
91f6b42 is described below
commit 91f6b42985c20ee9b1ef177d16c70c820a5cdf0e
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Sat Dec 12 13:54:56 2020 +0100
NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4725.
---
.../nifi/processors/kafka/pubsub/ConsumerLease.java | 3 ++-
.../nifi/processors/kafka/pubsub/ConsumerPoolTest.java | 15 +++++++--------
.../nifi/processors/kafka/pubsub/ConsumerLease.java | 3 ++-
.../nifi/processors/kafka/pubsub/ConsumerPoolTest.java | 13 +++++++------
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 3ecec49..729c801 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
- final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 3414420..195d2cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Before;
@@ -33,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -43,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -112,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
- when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
@@ -144,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -160,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
- when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@@ -187,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -204,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
- when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index c3846a2..e3e6124 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
- final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 18e188c..195d2cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -41,7 +42,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -110,7 +111,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleCreateClose() throws Exception {
- when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
}
@@ -142,7 +143,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -158,7 +159,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
- when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
for (int j = 0; j < 100; j++) {
@@ -185,7 +186,7 @@ public class ConsumerPoolTest {
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
lease.poll();
lease.commit();
@@ -202,7 +203,7 @@ public class ConsumerPoolTest {
@Test
public void validatePoolConsumerFails() throws Exception {
- when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();