You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/10/10 14:06:29 UTC

ignite git commit: IGNITE-9126 Update Apache Kafka dependency - Fixes #4909.

Repository: ignite
Updated Branches:
  refs/heads/master 8305d64ad -> bbd325566


IGNITE-9126 Update Apache Kafka dependency - Fixes #4909.

Signed-off-by: Dmitriy Pavlov <dp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd32556
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd32556
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd32556

Branch: refs/heads/master
Commit: bbd325566cc4066ebd70b58278ab33052fb5e436
Parents: 8305d64
Author: Max-Pudov <pu...@gmail.com>
Authored: Wed Oct 10 17:06:16 2018 +0300
Committer: Dmitriy Pavlov <dp...@apache.org>
Committed: Wed Oct 10 17:06:16 2018 +0300

----------------------------------------------------------------------
 modules/kafka/pom.xml                                 | 14 ++++++++++++++
 .../apache/ignite/stream/kafka/TestKafkaBroker.java   |  9 ++++++---
 .../stream/kafka/connect/IgniteSinkConnectorTest.java |  9 ++++++---
 .../kafka/connect/IgniteSourceConnectorTest.java      | 10 +++++++---
 parent/pom.xml                                        |  2 +-
 5 files changed, 34 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index 18ffcaa..63b2af2 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -56,6 +56,13 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
             <version>${kafka.version}</version>
             <scope>test</scope>
@@ -73,6 +80,13 @@
             <artifactId>curator-test</artifactId>
             <version>${curator.version}</version>
             <scope>test</scope>
+            <!-- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/143 -->
+            <exclusions>
+                <exclusion>
+                    <artifactId>guava</artifactId>
+                    <groupId>com.google.guava</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 4f0d1d3..9b9b377 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -27,7 +27,8 @@ import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.utils.SystemTime;
 import kafka.utils.TestUtils;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
@@ -101,7 +102,9 @@ public class TestKafkaBroker {
 
         servers.add(kafkaSrv);
 
-        TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+        KafkaZkClient client = kafkaSrv.zkClient();
+
+        TestUtils.createTopic(client, topic, partitions, replicationFactor,
             scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
     }
 
@@ -154,7 +157,7 @@ public class TestKafkaBroker {
     private void setupKafkaServer() throws IOException {
         kafkaCfg = new KafkaConfig(getKafkaConfig());
 
-        kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+        kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
 
         kafkaSrv.startup();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 90306a7..d710a75 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
@@ -48,6 +49,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -95,15 +97,16 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
         for (String topic : TOPICS)
             kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
 
-        WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
+        Map<String, String> props = makeWorkerProps();
+        WorkerConfig workerCfg = new StandaloneConfig(props);
 
         OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
         offBackingStore.configure(workerCfg);
 
-        worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
+        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
         worker.start();
 
-        herder = new StandaloneHerder(worker);
+        herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
         herder.start();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index cc487aa..8717044 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -47,11 +47,13 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -98,15 +100,16 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         kafkaBroker = new TestKafkaBroker();
 
-        WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
+        Map<String, String> props = makeWorkerProps();
+        WorkerConfig workerCfg = new StandaloneConfig(props);
 
         MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
         offBackingStore.configure(workerCfg);
 
-        worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
+        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
         worker.start();
 
-        herder = new StandaloneHerder(worker);
+        herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
         herder.start();
     }
 
@@ -250,6 +253,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
         props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
             "org.apache.kafka.common.serialization.StringDeserializer");

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index cfc3483..8cd08ee 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -89,7 +89,7 @@
         <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
         <jsonlib.version>2.4</jsonlib.version>
         <jtidy.version>r938</jtidy.version>
-        <kafka.version>0.10.0.1</kafka.version>
+        <kafka.version>1.1.1</kafka.version>
         <karaf.version>4.0.2</karaf.version>
         <log4j.version>2.11.0</log4j.version>
         <lucene.bundle.version>7.4.0_1</lucene.bundle.version>