You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/12/13 09:23:46 UTC

[nifi] branch main updated: NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 91f6b42  NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
91f6b42 is described below

commit 91f6b42985c20ee9b1ef177d16c70c820a5cdf0e
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Sat Dec 12 13:54:56 2020 +0100

    NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4725.
---
 .../nifi/processors/kafka/pubsub/ConsumerLease.java       |  3 ++-
 .../nifi/processors/kafka/pubsub/ConsumerPoolTest.java    | 15 +++++++--------
 .../nifi/processors/kafka/pubsub/ConsumerLease.java       |  3 ++-
 .../nifi/processors/kafka/pubsub/ConsumerPoolTest.java    | 13 +++++++------
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 3ecec49..729c801 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
          * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
          */
         try {
-            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
             lastPollEmpty = records.count() == 0;
             processRecords(records);
         } catch (final ProcessException pe) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 3414420..195d2cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.junit.Before;
@@ -33,6 +31,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +42,7 @@ import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -112,7 +111,7 @@ public class ConsumerPoolTest {
     @Test
     public void validatePoolSimpleCreateClose() throws Exception {
 
-        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
@@ -144,7 +143,7 @@ public class ConsumerPoolTest {
         };
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
@@ -160,7 +159,7 @@ public class ConsumerPoolTest {
 
     @Test
     public void validatePoolSimpleBatchCreateClose() throws Exception {
-        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
             try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
                 for (int j = 0; j < 100; j++) {
@@ -187,7 +186,7 @@ public class ConsumerPoolTest {
         };
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
@@ -204,7 +203,7 @@ public class ConsumerPoolTest {
     @Test
     public void validatePoolConsumerFails() throws Exception {
 
-        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             try {
                 lease.poll();
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index c3846a2..e3e6124 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -48,6 +48,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -178,7 +179,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
          * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
          */
         try {
-            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10));
             lastPollEmpty = records.count() == 0;
             processRecords(records);
         } catch (final ProcessException pe) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 18e188c..195d2cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,7 +42,7 @@ import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -110,7 +111,7 @@ public class ConsumerPoolTest {
     @Test
     public void validatePoolSimpleCreateClose() throws Exception {
 
-        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
@@ -142,7 +143,7 @@ public class ConsumerPoolTest {
         };
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
@@ -158,7 +159,7 @@ public class ConsumerPoolTest {
 
     @Test
     public void validatePoolSimpleBatchCreateClose() throws Exception {
-        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
             try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
                 for (int j = 0; j < 100; j++) {
@@ -185,7 +186,7 @@ public class ConsumerPoolTest {
         };
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
@@ -202,7 +203,7 @@ public class ConsumerPoolTest {
     @Test
     public void validatePoolConsumerFails() throws Exception {
 
-        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops"));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             try {
                 lease.poll();