You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/10/10 14:06:29 UTC
ignite git commit: IGNITE-9126 Update Apache Kafka dependency - Fixes
#4909.
Repository: ignite
Updated Branches:
refs/heads/master 8305d64ad -> bbd325566
IGNITE-9126 Update Apache Kafka dependency - Fixes #4909.
Signed-off-by: Dmitriy Pavlov <dp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd32556
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd32556
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd32556
Branch: refs/heads/master
Commit: bbd325566cc4066ebd70b58278ab33052fb5e436
Parents: 8305d64
Author: Max-Pudov <pu...@gmail.com>
Authored: Wed Oct 10 17:06:16 2018 +0300
Committer: Dmitriy Pavlov <dp...@apache.org>
Committed: Wed Oct 10 17:06:16 2018 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 14 ++++++++++++++
.../apache/ignite/stream/kafka/TestKafkaBroker.java | 9 ++++++---
.../stream/kafka/connect/IgniteSinkConnectorTest.java | 9 ++++++---
.../kafka/connect/IgniteSourceConnectorTest.java | 10 +++++++---
parent/pom.xml | 2 +-
5 files changed, 34 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index 18ffcaa..63b2af2 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -56,6 +56,13 @@
<dependency>
<groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
@@ -73,6 +80,13 @@
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
<scope>test</scope>
+ <!-- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/143 -->
+ <exclusions>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 4f0d1d3..9b9b377 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -27,7 +27,8 @@ import java.util.Properties;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.utils.SystemTime;
import kafka.utils.TestUtils;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -101,7 +102,9 @@ public class TestKafkaBroker {
servers.add(kafkaSrv);
- TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+ KafkaZkClient client = kafkaSrv.zkClient();
+
+ TestUtils.createTopic(client, topic, partitions, replicationFactor,
scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
}
@@ -154,7 +157,7 @@ public class TestKafkaBroker {
private void setupKafkaServer() throws IOException {
kafkaCfg = new KafkaConfig(getKafkaConfig());
- kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+ kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
kafkaSrv.startup();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 90306a7..d710a75 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
@@ -48,6 +49,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -95,15 +97,16 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
for (String topic : TOPICS)
kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
- WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
+ Map<String, String> props = makeWorkerProps();
+ WorkerConfig workerCfg = new StandaloneConfig(props);
OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
offBackingStore.configure(workerCfg);
- worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
+ worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
worker.start();
- herder = new StandaloneHerder(worker);
+ herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
herder.start();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index cc487aa..8717044 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -47,11 +47,13 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -98,15 +100,16 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
kafkaBroker = new TestKafkaBroker();
- WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
+ Map<String, String> props = makeWorkerProps();
+ WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
offBackingStore.configure(workerCfg);
- worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
+ worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
worker.start();
- herder = new StandaloneHerder(worker);
+ herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
herder.start();
}
@@ -250,6 +253,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index cfc3483..8cd08ee 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -89,7 +89,7 @@
<jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
<jsonlib.version>2.4</jsonlib.version>
<jtidy.version>r938</jtidy.version>
- <kafka.version>0.10.0.1</kafka.version>
+ <kafka.version>1.1.1</kafka.version>
<karaf.version>4.0.2</karaf.version>
<log4j.version>2.11.0</log4j.version>
<lucene.bundle.version>7.4.0_1</lucene.bundle.version>