You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/02 20:11:33 UTC

nifi git commit: NIFI-2834 Better Configs for Fetch and Timeout

Repository: nifi
Updated Branches:
  refs/heads/master 769530bea -> b5550ffcf


NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 ofSec to of Mils

This closes #1167


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5550ffc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5550ffc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5550ffc

Branch: refs/heads/master
Commit: b5550ffcf5d3b968551b91a41f146b18dcc0a9ec
Parents: 769530b
Author: Joe N <jo...@gmail.com>
Authored: Sat Oct 29 11:48:20 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Nov 2 16:10:37 2016 -0400

----------------------------------------------------------------------
 .../azure/eventhub/GetAzureEventHub.java        | 33 +++++++++++++++++++-
 .../azure/eventhub/GetAzureEventHubTest.java    |  4 +++
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 0455fe9..12ea1ba 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -121,6 +122,20 @@ public class GetAzureEventHub extends AbstractProcessor {
             .expressionLanguageSupported(false)
             .required(false)
             .build();
+    static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Partition Recivier Fetch Size")
+            .description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
+    static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Partiton Receiver Timeout (millseconds)")
+            .description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)")
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -130,6 +145,8 @@ public class GetAzureEventHub extends AbstractProcessor {
     private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
     private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
     private volatile Instant configuredEnqueueTime;
+    private volatile int receiverFetchSize;
+    private volatile Duration receiverFetchTimeout;
     private EventHubClient eventHubClient;
 
     private final static List<PropertyDescriptor> propertyDescriptors;
@@ -148,6 +165,9 @@ public class GetAzureEventHub extends AbstractProcessor {
         _propertyDescriptors.add(NUM_PARTITIONS);
         _propertyDescriptors.add(CONSUMER_GROUP);
         _propertyDescriptors.add(ENQUEUE_TIME);
+        _propertyDescriptors.add(RECEIVER_FETCH_SIZE);
+        _propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT);
+
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -201,6 +221,7 @@ public class GetAzureEventHub extends AbstractProcessor {
                     partitionId,
                     configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get();
 
+            receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
             partitionToReceiverMap.put(partitionId, receiver);
             return receiver;
 
@@ -222,7 +243,7 @@ public class GetAzureEventHub extends AbstractProcessor {
         final PartitionReceiver receiver;
         try {
             receiver = getReceiver(context, partitionId);
-            return receiver.receive(100).get();
+            return receiver.receive(receiverFetchSize).get();
         } catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) {
             throw new ProcessException(e);
         }
@@ -264,6 +285,16 @@ public class GetAzureEventHub extends AbstractProcessor {
         } else {
             configuredEnqueueTime = null;
         }
+        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+            receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
+        } else {
+            receiverFetchSize = 100;
+        }
+        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+            receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
+        } else {
+            receiverFetchTimeout = null;
+        }
 
         final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
         setupReceiver(connectionString);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index a63458f..951384a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -64,6 +64,10 @@ public class GetAzureEventHubTest {
         testRunner.assertValid();
         testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
         testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
+        testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
+        testRunner.assertValid();
     }
     
     @Test