You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/16 22:39:27 UTC

[1/2] nifi git commit: NIFI-1146 Allow GetKafka to be configured with auto.offset.reset to largest or smallest

Repository: nifi
Updated Branches:
  refs/heads/master 64369f67f -> 03a54bf2d


NIFI-1146 Allow GetKafka to be configured with auto.offset.reset to largest or smallest


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b954ca62
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b954ca62
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b954ca62

Branch: refs/heads/master
Commit: b954ca620e619e7961e6a7a58122b844f30862da
Parents: 64369f6
Author: Naveen Madhire <na...@capitalone.com>
Authored: Mon Nov 16 11:59:52 2015 -0600
Committer: Naveen Madhire <na...@capitalone.com>
Committed: Mon Nov 16 11:59:52 2015 -0600

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 52 +++++++++++++-------
 1 file changed, 33 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b954ca62/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index e10977b..e644064 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -65,13 +65,18 @@ import kafka.message.MessageAndMetadata;
 @CapabilityDescription("Fetches messages from Apache Kafka")
 @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
 @WritesAttributes({
-    @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"),
-    @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If"
-            + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
-    @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
-    @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
+        @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"),
+        @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If"
+                + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
+        @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
+        @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1"),
+        @WritesAttribute(attribute = "auto.offset.reset", description = "If this is set to largest, the consumer may lose some messages when the number of partitions, " +
+                "for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest")})
 public class GetKafka extends AbstractProcessor {
 
+    public static final String SMALLEST = "smallest";
+    public static final String LARGEST = "largest";
+
     public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
             .name("ZooKeeper Connection String")
             .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>"
@@ -141,12 +146,20 @@ public class GetKafka extends AbstractProcessor {
             .expressionLanguageSupported(false)
             .build();
     public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
-        .name("Group ID")
-        .description("A Group ID is used to identify consumers that are within the same consumer group")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Group ID")
+            .description("A Group ID is used to identify consumers that are within the same consumer group")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
+            .name("Auto Offset Reset")
+            .description("Auto Offset Reset indicator")
+            .required(true)
+            .allowableValues(SMALLEST, LARGEST)
+            .defaultValue(LARGEST)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -163,13 +176,13 @@ public class GetKafka extends AbstractProcessor {
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(CLIENT_NAME)
-            .defaultValue("NiFi-" + getIdentifier())
-            .build();
+                .fromPropertyDescriptor(CLIENT_NAME)
+                .defaultValue("NiFi-" + getIdentifier())
+                .build();
         final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(GROUP_ID)
-            .defaultValue(getIdentifier())
-            .build();
+                .fromPropertyDescriptor(GROUP_ID)
+                .defaultValue(getIdentifier())
+                .build();
 
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(ZOOKEEPER_CONNECTION_STRING);
@@ -181,6 +194,7 @@ public class GetKafka extends AbstractProcessor {
         props.add(groupIdWithDefault);
         props.add(KAFKA_TIMEOUT);
         props.add(ZOOKEEPER_TIMEOUT);
+        props.add(AUTO_OFFSET_RESET);
         return props;
     }
 
@@ -204,7 +218,7 @@ public class GetKafka extends AbstractProcessor {
         props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
         props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
         props.setProperty("auto.commit.enable", "true"); // just be explicit
-        props.setProperty("auto.offset.reset", "smallest");
+        props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
         props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
         props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
 
@@ -356,4 +370,4 @@ public class GetKafka extends AbstractProcessor {
         }
     }
 
-}
+}
\ No newline at end of file


[2/2] nifi git commit: Changes after review

Posted by ma...@apache.org.
Changes after review


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03a54bf2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03a54bf2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03a54bf2

Branch: refs/heads/master
Commit: 03a54bf2d593e07ab602f6a9425d0231a273ba5a
Parents: b954ca6
Author: Naveen Madhire <na...@capitalone.com>
Authored: Mon Nov 16 13:32:17 2015 -0600
Committer: Naveen Madhire <na...@capitalone.com>
Committed: Mon Nov 16 13:32:17 2015 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/kafka/GetKafka.java   | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/03a54bf2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index e644064..4be6194 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -69,9 +69,7 @@ import kafka.message.MessageAndMetadata;
         @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If"
                 + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
         @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
-        @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1"),
-        @WritesAttribute(attribute = "auto.offset.reset", description = "If this is set to largest, the consumer may lose some messages when the number of partitions, " +
-                "for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest")})
+        @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
 public class GetKafka extends AbstractProcessor {
 
     public static final String SMALLEST = "smallest";
@@ -155,7 +153,7 @@ public class GetKafka extends AbstractProcessor {
 
     public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
             .name("Auto Offset Reset")
-            .description("Auto Offset Reset indicator")
+            .description("Automatically reset the offset to the smallest or largest offset available on the broker")
             .required(true)
             .allowableValues(SMALLEST, LARGEST)
             .defaultValue(LARGEST)