You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/16 16:47:21 UTC
[28/50] [abbrv] incubator-nifi git commit: NIFI-733: Make use of
Client Name, Zookeeper Timeout,
Kafka Timeout properties and add new Group ID property
NIFI-733: Make use of Client Name, Zookeeper Timeout, Kafka Timeout properties and add new Group ID property
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/786bc1d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/786bc1d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/786bc1d6
Branch: refs/heads/master
Commit: 786bc1d61260e2d8558747ca206d360aebdd1994
Parents: 063afe2
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 3 09:04:52 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 3 09:04:52 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/kafka/GetKafka.java | 28 +++++++++++++++-----
1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/786bc1d6/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 1b63a46..26590df 100644
--- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -129,6 +129,7 @@ public class GetKafka extends AbstractProcessor {
.expressionLanguageSupported(false)
.defaultValue("\\n")
.build();
+
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
@@ -136,6 +137,13 @@ public class GetKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
+ public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
+ .name("Group ID")
+ .description("A Group ID is used to identify consumers that are within the same consumer group")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -152,9 +160,13 @@ public class GetKafka extends AbstractProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(CLIENT_NAME)
- .defaultValue("NiFi-" + getIdentifier())
- .build();
+ .fromPropertyDescriptor(CLIENT_NAME)
+ .defaultValue("NiFi-" + getIdentifier())
+ .build();
+ final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(GROUP_ID)
+ .defaultValue(getIdentifier())
+ .build();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_CONNECTION_STRING);
@@ -163,6 +175,7 @@ public class GetKafka extends AbstractProcessor {
props.add(BATCH_SIZE);
props.add(MESSAGE_DEMARCATOR);
props.add(clientNameWithDefault);
+ props.add(groupIdWithDefault);
props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT);
return props;
@@ -184,10 +197,13 @@ public class GetKafka extends AbstractProcessor {
final Properties props = new Properties();
props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
- props.setProperty("group.id", getIdentifier());
+ props.setProperty("group.id", context.getProperty(GROUP_ID).getValue());
+ props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.commit.enable", "true"); // just be explicit
props.setProperty("auto.offset.reset", "smallest");
+ props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+ props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
@@ -236,7 +252,7 @@ public class GetKafka extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
+ final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if (iterator == null) {
return;
}
@@ -293,7 +309,7 @@ public class GetKafka extends AbstractProcessor {
}
// add the message to the FlowFile's contents
- final boolean firstMessage = (msgCount == 0);
+ final boolean firstMessage = msgCount == 0;
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {