You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/02 20:11:33 UTC
nifi git commit: NIFI-2834 Better Configs for Fetch and Timeout
Repository: nifi
Updated Branches:
refs/heads/master 769530bea -> b5550ffcf
NIFI-2834 Better Configs for Fetch and Timeout
NIFI-2834 Better Configs for Fetch and Timeout
NIFI-2834 Better Configs for Fetch and Timeout
NIFI-2834 ofSec to of Mils
This closes #1167
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5550ffc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5550ffc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5550ffc
Branch: refs/heads/master
Commit: b5550ffcf5d3b968551b91a41f146b18dcc0a9ec
Parents: 769530b
Author: Joe N <jo...@gmail.com>
Authored: Sat Oct 29 11:48:20 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Nov 2 16:10:37 2016 -0400
----------------------------------------------------------------------
.../azure/eventhub/GetAzureEventHub.java | 33 +++++++++++++++++++-
.../azure/eventhub/GetAzureEventHubTest.java | 4 +++
2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 0455fe9..12ea1ba 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -121,6 +122,20 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(false)
.required(false)
.build();
+ static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Partition Recivier Fetch Size")
+ .description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(false)
+ .build();
+ static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Partiton Receiver Timeout (millseconds)")
+ .description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)")
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(false)
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -130,6 +145,8 @@ public class GetAzureEventHub extends AbstractProcessor {
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private volatile Instant configuredEnqueueTime;
+ private volatile int receiverFetchSize;
+ private volatile Duration receiverFetchTimeout;
private EventHubClient eventHubClient;
private final static List<PropertyDescriptor> propertyDescriptors;
@@ -148,6 +165,9 @@ public class GetAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP);
_propertyDescriptors.add(ENQUEUE_TIME);
+ _propertyDescriptors.add(RECEIVER_FETCH_SIZE);
+ _propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT);
+
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@@ -201,6 +221,7 @@ public class GetAzureEventHub extends AbstractProcessor {
partitionId,
configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get();
+ receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
partitionToReceiverMap.put(partitionId, receiver);
return receiver;
@@ -222,7 +243,7 @@ public class GetAzureEventHub extends AbstractProcessor {
final PartitionReceiver receiver;
try {
receiver = getReceiver(context, partitionId);
- return receiver.receive(100).get();
+ return receiver.receive(receiverFetchSize).get();
} catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) {
throw new ProcessException(e);
}
@@ -264,6 +285,16 @@ public class GetAzureEventHub extends AbstractProcessor {
} else {
configuredEnqueueTime = null;
}
+ if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+ receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
+ } else {
+ receiverFetchSize = 100;
+ }
+ if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+ receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
+ } else {
+ receiverFetchTimeout = null;
+ }
final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
setupReceiver(connectionString);
http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index a63458f..951384a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -64,6 +64,10 @@ public class GetAzureEventHubTest {
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
+ testRunner.assertValid();
}
@Test