You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2015/12/16 20:26:15 UTC
[2/2] nifi git commit: NIFI-1218 upgraded Kafka to 0.9.0.0 client API
Tested and validated that it is still compatible with 0.8.* Kafka brokers
NIFI-1218 upgraded Kafka to 0.9.0.0 client API Tested and validated that it is still compatible with 0.8.* Kafka brokers
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37635232
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37635232
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37635232
Branch: refs/heads/master
Commit: 37635232c708ed5bdb63033751a74a9aca40b12f
Parents: b19ff7c
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Wed Dec 16 12:49:44 2015 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Wed Dec 16 14:25:28 2015 -0500
----------------------------------------------------------------------
.../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++---
.../org/apache/nifi/processors/kafka/KafkaUtils.java | 3 ++-
.../org/apache/nifi/processors/kafka/PutKafka.java | 3 +--
.../org/apache/nifi/processors/kafka/TestPutKafka.java | 13 +++++++++++++
4 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index afbffbe..9fb5589 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -37,12 +37,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.2</version>
+ <version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.1</artifactId>
- <version>0.8.2.2</version>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.9.0.0</version>
<exclusions>
<!-- Transitive dependencies excluded because they are located
in a legacy Maven repository, which Maven 3 doesn't support. -->
http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
index 657d88b..a1e9f65 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZKStringSerializer;
+import kafka.utils.ZkUtils;
import scala.collection.JavaConversions;
/**
@@ -50,7 +51,7 @@ class KafkaUtils {
}
});
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
- .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
+ .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false));
return topicMetadatas.size();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index b5766e4..eec03ea 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
-import scala.actors.threadpool.Arrays;
-
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 17d1cc8..e352840 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
@@ -474,6 +475,18 @@ public class TestPutKafka {
@Override
public void close() {
}
+
+ @Override
+ public void close(long arg0, TimeUnit arg1) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void flush() {
+ // TODO Auto-generated method stub
+
+ }
}
}