You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/01 11:24:13 UTC
[04/12] ignite git commit: IGNITE-2016: Kafka Connect integration -
reflected review comments (avoiding setting same task parameters more than
once). - Fixes #335.
IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.
Signed-off-by: shtykh_roman <rs...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c92c2747
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c92c2747
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c92c2747
Branch: refs/heads/ignite-2224
Commit: c92c274737391546b3dfe3ccbe527329c462d95f
Parents: 861236a
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Feb 1 10:35:02 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Mon Feb 1 10:35:02 2016 +0900
----------------------------------------------------------------------
modules/kafka/README.txt | 111 +++++-
modules/kafka/pom.xml | 69 ++--
.../ignite/stream/kafka/KafkaStreamer.java | 2 +-
.../kafka/connect/IgniteSinkConnector.java | 91 +++++
.../kafka/connect/IgniteSinkConstants.java | 38 ++
.../stream/kafka/connect/IgniteSinkTask.java | 165 ++++++++
.../kafka/IgniteKafkaStreamerSelfTestSuite.java | 9 +-
.../stream/kafka/KafkaEmbeddedBroker.java | 387 -------------------
.../kafka/KafkaIgniteStreamerSelfTest.java | 13 +-
.../ignite/stream/kafka/SimplePartitioner.java | 53 ---
.../ignite/stream/kafka/TestKafkaBroker.java | 237 ++++++++++++
.../kafka/connect/IgniteSinkConnectorTest.java | 250 ++++++++++++
.../kafka/src/test/resources/example-ignite.xml | 71 ++++
parent/pom.xml | 9 +-
14 files changed, 1011 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index 1eaf861..f4e56bd 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -1,16 +1,17 @@
Apache Ignite Kafka Streamer Module
-------------------------
+-----------------------------------
Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
-To enable Kafka Streamer module when starting a standalone node, move 'optional/ignite-Kafka' folder to
-'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will
-be added to classpath in this case.
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
-Importing Ignite Kafka Streamer Module In Maven Project
--------------------------------------
+Below are the details.
-If you are using Maven to manage dependencies of your project, you can add JCL module
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
dependency like this (replace '${ignite.version}' with actual Ignite version you are
interested in):
@@ -30,3 +31,99 @@ interested in):
</dependencies>
...
</project>
+
+
+## Streaming Data via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --operty key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index e00b190..0ac0487 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -20,7 +20,8 @@
<!--
POM file.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -43,48 +44,28 @@
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-log4j</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- <version>${asm.version}</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -96,11 +77,33 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>${easymock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index cbc5b1b..487c369 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -224,4 +224,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..9385920
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+ /** Sink properties. */
+ private Map<String, String> configProps;
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ /**
+ * A sink lifecycle method. Validates grid-specific sink properties.
+ *
+ * @param props Sink properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ configProps = props;
+
+ try {
+ A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+ }
+ catch (IllegalArgumentException e) {
+ throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+ }
+ }
+
+ /**
+ * Obtains a sink task class to be instantiated for feeding data into grid.
+ *
+ * @return IgniteSinkTask class.
+ */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSinkTask.class;
+ }
+
+ /**
+ * Builds each config for <tt>maxTasks</tt> tasks.
+ *
+ * @param maxTasks Max number of tasks.
+ * @return Task configs.
+ */
+ @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ Map<String, String> taskProps = new HashMap<>();
+
+ taskProps.putAll(configProps);
+
+ for (int i = 0; i < maxTasks; i++)
+ taskConfigs.add(taskProps);
+
+ return taskConfigs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..7680d96
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+ /** Ignite configuration file path. */
+ public static final String CACHE_CFG_PATH = "igniteCfg";
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cacheName";
+
+ /** Flag to enable overwriting existing values in cache. */
+ public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+ /** Size of per-node buffer before data is sent to remote node. */
+ public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+ /** Maximum number of parallel stream operations per node. */
+ public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..3d9a00d
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+ /** Flag for stopped state. */
+ private static volatile boolean stopped = true;
+
+ /** Ignite grid configuration file. */
+ private static String igniteConfigFile;
+
+ /** Cache name. */
+ private static String cacheName;
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return new IgniteSinkConnector().version();
+ }
+
+ /**
+ * Initializes grid client from configPath.
+ *
+ * @param props Task properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ // Each task has the same parameters -- avoid setting more than once.
+ if (cacheName != null)
+ return;
+
+ cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+ igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+ StreamerContext.getStreamer().allowOverwrite(
+ Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+ StreamerContext.getStreamer().perNodeBufferSize(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+ StreamerContext.getStreamer().perNodeParallelOperations(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+ stopped = false;
+ }
+
+ /**
+ * Buffers records.
+ *
+ * @param records Records to inject into grid.
+ */
+ @SuppressWarnings("unchecked")
+ @Override public void put(Collection<SinkRecord> records) {
+ try {
+ for (SinkRecord record : records) {
+ if (record.key() != null) {
+ // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+ StreamerContext.getStreamer().addData(record.key(), record.value());
+ }
+ else {
+ log.error("Failed to stream a record with null key!");
+ }
+
+ }
+ }
+ catch (ConnectException e) {
+ log.error("Failed adding record", e);
+
+ throw new ConnectException(e);
+ }
+ }
+
+ /**
+ * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+ *
+ * @param offsets Offset information.
+ */
+ @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (stopped)
+ return;
+
+ StreamerContext.getStreamer().flush();
+ }
+
+ /**
+ * Stops the grid client.
+ */
+ @Override public void stop() {
+ if (stopped)
+ return;
+
+ stopped = true;
+
+ StreamerContext.getStreamer().close();
+ StreamerContext.getIgnite().close();
+ }
+
+ /**
+ * Streamer context initializing grid and data streamer instances on demand.
+ */
+ public static class StreamerContext {
+ /** Constructor. */
+ private StreamerContext() {
+ }
+
+ /** Instance holder. */
+ private static class Holder {
+ private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+ private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+ }
+
+ /**
+ * Obtains grid instance.
+ *
+ * @return Grid instance.
+ */
+ public static Ignite getIgnite() {
+ return Holder.IGNITE;
+ }
+
+ /**
+ * Obtains data streamer instance.
+ *
+ * @return Data streamer instance.
+ */
+ public static IgniteDataStreamer getStreamer() {
+ return Holder.STREAMER;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 9115ab4..731f540 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -18,9 +18,10 @@
package org.apache.ignite.stream.kafka;
import junit.framework.TestSuite;
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
/**
- * Apache Kafka streamer tests.
+ * Apache Kafka streamers tests.
*/
public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
/**
@@ -30,8 +31,12 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
+ // Kafka streamer.
suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
+ // Kafka streamer via Connect API.
+ suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+
return suite;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
deleted file mode 100644
index 5e7cee7..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.api.LeaderAndIsr;
-import kafka.api.PartitionStateInfo;
-import kafka.api.Request;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.StringEncoder;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
-/**
- * Kafka Embedded Broker.
- */
-public class KafkaEmbeddedBroker {
- /** Default ZooKeeper host. */
- private static final String ZK_HOST = "localhost";
-
- /** Broker port. */
- private static final int BROKER_PORT = 9092;
-
- /** ZooKeeper connection timeout. */
- private static final int ZK_CONNECTION_TIMEOUT = 6000;
-
- /** ZooKeeper session timeout. */
- private static final int ZK_SESSION_TIMEOUT = 6000;
-
- /** ZooKeeper port. */
- private static int zkPort = 0;
-
- /** Is ZooKeeper ready. */
- private boolean zkReady;
-
- /** Kafka config. */
- private KafkaConfig brokerCfg;
-
- /** Kafka server. */
- private KafkaServer kafkaSrv;
-
- /** ZooKeeper client. */
- private ZkClient zkClient;
-
- /** Embedded ZooKeeper. */
- private EmbeddedZooKeeper zooKeeper;
-
- /**
- * Creates an embedded Kafka broker.
- */
- public KafkaEmbeddedBroker() {
- try {
- setupEmbeddedZooKeeper();
-
- setupEmbeddedKafkaServer();
- }
- catch (IOException | InterruptedException e) {
- throw new RuntimeException("Failed to start Kafka broker " + e);
- }
- }
-
- /**
- * @return ZooKeeper address.
- */
- public static String getZKAddress() {
- return ZK_HOST + ':' + zkPort;
- }
-
- /**
- * Creates a Topic.
- *
- * @param topic Topic name.
- * @param partitions Number of partitions for the topic.
- * @param replicationFactor Replication factor.
- * @throws TimeoutException If operation is timed out.
- * @throws InterruptedException If interrupted.
- */
- public void createTopic(String topic, int partitions, int replicationFactor)
- throws TimeoutException, InterruptedException {
- AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-
- waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
- }
-
- /**
- * Sends message to Kafka broker.
- *
- * @param keyedMessages List of keyed messages.
- * @return Producer used to send the message.
- */
- public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
- Producer<String, String> producer = new Producer<>(getProducerConfig());
-
- producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
-
- return producer;
- }
-
- /**
- * Shuts down Kafka broker.
- */
- public void shutdown() {
- zkReady = false;
-
- if (kafkaSrv != null)
- kafkaSrv.shutdown();
-
- List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
-
- for (String logDir : logDirs)
- U.delete(new File(logDir));
-
- if (zkClient != null) {
- zkClient.close();
-
- zkClient = null;
- }
-
- if (zooKeeper != null) {
-
- try {
- zooKeeper.shutdown();
- }
- catch (IOException e) {
- // No-op.
- }
-
- zooKeeper = null;
- }
- }
-
- /**
- * @return ZooKeeper client.
- */
- private ZkClient getZkClient() {
- A.ensure(zkReady, "Zookeeper not setup yet");
- A.notNull(zkClient, "Zookeeper client is not yet initialized");
-
- return zkClient;
- }
-
- /**
- * Checks if topic metadata is propagated.
- *
- * @param topic Topic name.
- * @param part Partition.
- * @return {@code True} if propagated, otherwise {@code false}.
- */
- private boolean isMetadataPropagated(String topic, int part) {
- scala.Option<PartitionStateInfo> partStateOption =
- kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
-
- if (!partStateOption.isDefined())
- return false;
-
- PartitionStateInfo partState = partStateOption.get();
-
- LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
- return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
- Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
- }
-
- /**
- * Waits until metadata is propagated.
- *
- * @param topic Topic name.
- * @param part Partition.
- * @param timeout Timeout value in millis.
- * @param interval Interval in millis to sleep.
- * @throws TimeoutException If operation is timed out.
- * @throws InterruptedException If interrupted.
- */
- private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
- throws TimeoutException, InterruptedException {
- int attempt = 1;
-
- long startTime = System.currentTimeMillis();
-
- while (true) {
- if (isMetadataPropagated(topic, part))
- return;
-
- long duration = System.currentTimeMillis() - startTime;
-
- if (duration < timeout)
- Thread.sleep(interval);
- else
- throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
-
- attempt++;
- }
- }
-
- /**
- * Sets up embedded Kafka server.
- *
- * @throws IOException If failed.
- */
- private void setupEmbeddedKafkaServer() throws IOException {
- A.ensure(zkReady, "Zookeeper should be setup before hand");
-
- brokerCfg = new KafkaConfig(getBrokerConfig());
-
- kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
-
- kafkaSrv.startup();
- }
-
- /**
- * Sets up embedded ZooKeeper.
- *
- * @throws IOException If failed.
- * @throws InterruptedException If interrupted.
- */
- private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
- EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
-
- zooKeeper.startup();
-
- zkPort = zooKeeper.getActualPort();
-
- zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
-
- zkReady = true;
- }
-
- /**
- * @return Kafka broker address.
- */
- private static String getBrokerAddress() {
- return ZK_HOST + ':' + BROKER_PORT;
- }
-
- /**
- * Gets Kafka broker config.
- *
- * @return Kafka broker config.
- * @throws IOException If failed.
- */
- private static Properties getBrokerConfig() throws IOException {
- Properties props = new Properties();
-
- props.put("broker.id", "0");
- props.put("host.name", ZK_HOST);
- props.put("port", "" + BROKER_PORT);
- props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
- props.put("zookeeper.connect", getZKAddress());
- props.put("log.flush.interval.messages", "1");
- props.put("replica.socket.timeout.ms", "1500");
-
- return props;
- }
-
- /**
- * @return Kafka Producer config.
- */
- private static ProducerConfig getProducerConfig() {
- Properties props = new Properties();
-
- props.put("metadata.broker.list", getBrokerAddress());
- props.put("serializer.class", StringEncoder.class.getName());
- props.put("key.serializer.class", StringEncoder.class.getName());
- props.put("partitioner.class", SimplePartitioner.class.getName());
-
- return new ProducerConfig(props);
- }
-
- /**
- * Creates temp directory.
- *
- * @param prefix Prefix.
- * @return Created file.
- * @throws IOException If failed.
- */
- private static File createTempDir( String prefix) throws IOException {
- Path path = Files.createTempDirectory(prefix);
-
- return path.toFile();
- }
-
- /**
- * Creates embedded ZooKeeper.
- */
- private static class EmbeddedZooKeeper {
- /** Default ZooKeeper host. */
- private final String zkHost;
-
- /** Default ZooKeeper port. */
- private final int zkPort;
-
- /** NIO context factory. */
- private NIOServerCnxnFactory factory;
-
- /** Snapshot directory. */
- private File snapshotDir;
-
- /** Log directory. */
- private File logDir;
-
- /**
- * Creates an embedded ZooKeeper.
- *
- * @param zkHost ZooKeeper host.
- * @param zkPort ZooKeeper port.
- */
- EmbeddedZooKeeper(String zkHost, int zkPort) {
- this.zkHost = zkHost;
- this.zkPort = zkPort;
- }
-
- /**
- * Starts up ZooKeeper.
- *
- * @throws IOException If failed.
- * @throws InterruptedException If interrupted.
- */
- void startup() throws IOException, InterruptedException {
- snapshotDir = createTempDir("_ss");
-
- logDir = createTempDir("_log");
-
- ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
-
- factory = new NIOServerCnxnFactory();
-
- factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
-
- factory.startup(zkSrv);
- }
-
- /**
- * @return Actual port ZooKeeper is started.
- */
- int getActualPort() {
- return factory.getLocalPort();
- }
-
- /**
- * Shuts down ZooKeeper.
- *
- * @throws IOException If failed.
- */
- void shutdown() throws IOException {
- if (factory != null) {
- factory.shutdown();
-
- U.delete(snapshotDir);
-
- U.delete(logDir);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 927ba3d..829c877 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerConfig;
import kafka.producer.KeyedMessage;
@@ -40,14 +41,13 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.getZKAddress;
/**
* Tests {@link KafkaStreamer}.
*/
public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
/** Embedded Kafka. */
- private KafkaEmbeddedBroker embeddedBroker;
+ private TestKafkaBroker embeddedBroker;
/** Count. */
private static final int CNT = 100;
@@ -77,7 +77,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
- embeddedBroker = new KafkaEmbeddedBroker();
+ embeddedBroker = new TestKafkaBroker();
}
/** {@inheritDoc} */
@@ -176,7 +176,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
kafkaStmr.setThreads(4);
// Set the consumer configuration.
- kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
// Set the decoders.
StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
@@ -199,7 +199,8 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
- latch.await();
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
for (Map.Entry<String, String> entry : keyValMap.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));
@@ -232,4 +233,4 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
return new ConsumerConfig(props);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
deleted file mode 100644
index b49bebe..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Simple partitioner for Kafka.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner implements Partitioner {
- /**
- * Constructs instance.
- *
- * @param props Properties.
- */
- public SimplePartitioner(VerifiableProperties props) {
- // No-op.
- }
-
- /**
- * Partitions the key based on the key value.
- *
- * @param key Key.
- * @param partSize Partition size.
- * @return partition Partition.
- */
- public int partition(Object key, int partSize) {
- String keyStr = (String)key;
-
- String[] keyValues = keyStr.split("\\.");
-
- Integer intKey = Integer.parseInt(keyValues[3]);
-
- return intKey > 0 ? intKey % partSize : 0;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..70acb78
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+ /** ZooKeeper connection timeout. */
+ private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+ /** ZooKeeper session timeout. */
+ private static final int ZK_SESSION_TIMEOUT = 6000;
+
+ /** ZooKeeper port. */
+ private static final int ZK_PORT = 21811;
+
+ /** Broker host. */
+ private static final String BROKER_HOST = "localhost";
+
+ /** Broker port. */
+ private static final int BROKER_PORT = 9092;
+
+ /** Kafka config. */
+ private KafkaConfig kafkaCfg;
+
+ /** Kafka server. */
+ private KafkaServer kafkaSrv;
+
+ /** ZooKeeper. */
+ private TestingServer zkServer;
+
+ /** Kafka Zookeeper utils. */
+ private ZkUtils zkUtils;
+
+ /**
+ * Kafka broker constructor.
+ */
+ public TestKafkaBroker() {
+ try {
+ setupZooKeeper();
+
+ setupKafkaServer();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to start Kafka: " + e);
+ }
+ }
+
+ /**
+ * Creates a topic.
+ *
+ * @param topic Topic name.
+ * @param partitions Number of partitions for the topic.
+ * @param replicationFactor Replication factor.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
+ */
+ public void createTopic(String topic, int partitions, int replicationFactor)
+ throws TimeoutException, InterruptedException {
+ List<KafkaServer> servers = new ArrayList<>();
+
+ servers.add(kafkaSrv);
+
+ TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+ scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+ }
+
+ /**
+ * Sends a message to Kafka broker.
+ *
+ * @param keyedMessages List of keyed messages.
+ * @return Producer used to send the message.
+ */
+ public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ Producer<String, String> producer = new Producer<>(getProducerConfig());
+
+ producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
+ return producer;
+ }
+
+ /**
+ * Shuts down test Kafka broker.
+ */
+ public void shutdown() {
+ if (zkUtils != null)
+ zkUtils.close();
+
+ if (kafkaSrv != null)
+ kafkaSrv.shutdown();
+
+ if (zkServer != null) {
+ try {
+ zkServer.stop();
+ }
+ catch (IOException e) {
+ // No-op.
+ }
+ }
+
+ List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+ for (String logDir : logDirs)
+ U.delete(new File(logDir));
+ }
+
+ /**
+ * Sets up test Kafka broker.
+ *
+ * @throws IOException If failed.
+ */
+ private void setupKafkaServer() throws IOException {
+ kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+ kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+
+ kafkaSrv.startup();
+ }
+
+ /**
+ * Sets up ZooKeeper test server.
+ *
+ * @throws Exception If failed.
+ */
+ private void setupZooKeeper() throws Exception {
+ zkServer = new TestingServer(ZK_PORT, true);
+
+ Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+ ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+ zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+ }
+
+ /**
+ * Obtains Kafka config.
+ *
+ * @return Kafka config.
+ * @throws IOException If failed.
+ */
+ private Properties getKafkaConfig() throws IOException {
+ Properties props = new Properties();
+
+ props.put("broker.id", "0");
+ props.put("zookeeper.connect", zkServer.getConnectString());
+ props.put("host.name", BROKER_HOST);
+ props.put("port", BROKER_PORT);
+ props.put("offsets.topic.replication.factor", "1");
+ props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+ props.put("log.flush.interval.messages", "1");
+
+ return props;
+ }
+
+ /**
+ * Obtains broker address.
+ *
+ * @return Kafka broker address.
+ */
+ public String getBrokerAddress() {
+ return BROKER_HOST + ":" + BROKER_PORT;
+ }
+
+ /**
+ * Obtains Zookeeper address.
+ *
+ * @return Zookeeper address.
+ */
+ public String getZookeeperAddress() {
+ return BROKER_HOST + ":" + ZK_PORT;
+ }
+
+ /**
+ * Obtains producer config.
+ *
+ * @return Kafka Producer config.
+ */
+ private ProducerConfig getProducerConfig() {
+ Properties props = new Properties();
+
+ props.put("metadata.broker.list", getBrokerAddress());
+ props.put("bootstrap.servers", getBrokerAddress());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+ return new ProducerConfig(props);
+ }
+
+ /**
+ * Creates temporary directory.
+ *
+ * @param prefix Prefix.
+ * @return Created file.
+ * @throws IOException If failed.
+ */
+ private static File createTmpDir(String prefix) throws IOException {
+ Path path = Files.createTempDirectory(prefix);
+
+ return path.toFile();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..a8583d0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+ /** Number of input messages. */
+ private static final int EVENT_CNT = 10000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "testCache";
+
+ /** Test topics. */
+ private static final String[] TOPICS = {"test1", "test2"};
+
+ /** Kafka partition. */
+ private static final int PARTITIONS = 3;
+
+ /** Kafka replication factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Test Kafka broker. */
+ private TestKafkaBroker kafkaBroker;
+
+ /** Worker to run tasks. */
+ private Worker worker;
+
+ /** Workers' herder. */
+ private Herder herder;
+
+ /** Ignite server node. */
+ private Ignite grid;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ grid = startGrid("igniteServerNode", cfg);
+
+ kafkaBroker = new TestKafkaBroker();
+
+ for (String topic : TOPICS)
+ kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+ WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+ OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class);
+ offsetBackingStore.configure(anyObject(Map.class));
+
+ worker = new Worker(workerConfig, offsetBackingStore);
+ worker.start();
+
+ herder = new StandaloneHerder(worker);
+ herder.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ herder.stop();
+
+ worker.stop();
+
+ kafkaBroker.shutdown();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+ * specified Kafka topics, because a sink task can read from multiple topics.
+ *
+ * @throws Exception Thrown in case of the failure.
+ */
+ public void testSinkPuts() throws Exception {
+ Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+ @Override
+ public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+ if (error != null)
+ throw new RuntimeException("Failed to create a job!");
+ }
+ });
+
+ herder.putConnectorConfig(
+ sinkProps.get(ConnectorConfig.NAME_CONFIG),
+ sinkProps, false, cb);
+
+ cb.get();
+
+ final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+ final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+ // Produces events for the specified number of topics
+ for (String topic : TOPICS)
+ keyValMap.putAll(produceStream(topic));
+
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+ // Checks that each event was processed properly.
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+ assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+ }
+
+ /**
+ * Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(String topic) {
+ List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < EVENT_CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String key = topic + "_" + String.valueOf(evt);
+ String msg = runtime + String.valueOf(evt);
+
+ messages.add(new KeyedMessage<>(topic, key, msg));
+
+ keyValMap.put(key, msg);
+ }
+
+ Producer<String, String> producer = kafkaBroker.sendMessages(messages);
+
+ producer.close();
+
+ return keyValMap;
+ }
+
+ /**
+ * Creates properties for test sink connector.
+ *
+ * @param topics Topics.
+ * @return Test sink connector properties.
+ */
+ private Map<String, String> makeSinkProps(String topics) {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(ConnectorConfig.TOPICS_CONFIG, topics);
+ props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+ props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+ props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+ props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+ props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+
+ return props;
+ }
+
+ /**
+ * Creates properties for Kafka Connect workers.
+ *
+ * @return Worker configurations.
+ */
+ private Map<String, String> makeWorkerProps() {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+ Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <!-- Enable client mode. -->
+ <property name="clientMode" value="true"/>
+
+ <!-- Cache accessed from IgniteSink. -->
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="name" value="testCache"/>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Enable cache events. -->
+ <property name="includeEventTypes">
+ <list>
+ <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+ <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8f02fec..21d8c69 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
<commons.lang.version>2.6</commons.lang.version>
<cron4j.version>2.2.5</cron4j.version>
<curator.version>2.9.1</curator.version>
+ <easymock.version>3.4</easymock.version>
<ezmorph.bundle.version>1.0.6_1</ezmorph.bundle.version>
<ezmorph.version>1.0.6</ezmorph.version>
<flume.ng.version>1.6.0</flume.ng.version>
@@ -82,11 +83,9 @@
<jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
<jsonlib.version>2.4</jsonlib.version>
<jtidy.version>r938</jtidy.version>
- <kafka.bundle.version>0.8.2.1_1</kafka.bundle.version>
- <kafka.clients.bundle.version>0.8.2.0_1</kafka.clients.bundle.version>
- <kafka.clients.version>0.8.2.0</kafka.clients.version>
- <kafka.version>0.8.2.1</kafka.version>
- <kafka.version>0.8.2.1</kafka.version>
+ <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version>
+ <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version>
+ <kafka.version>0.9.0.0</kafka.version>
<karaf.version>4.0.2</karaf.version>
<lucene.bundle.version>3.5.0_1</lucene.bundle.version>
<lucene.version>3.5.0</lucene.version>