You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/07/06 16:30:36 UTC
[17/28] incubator-ignite git commit: ignite-428 Implement
IgniteKafkaStreamer to stream data from Apache Kafka
ignite-428 Implement IgniteKafkaStreamer to stream data from Apache Kafka
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2c41739d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2c41739d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2c41739d
Branch: refs/heads/ignite-gg-9323
Commit: 2c41739dcd83751270b6bd30c6f2595edc68a1b1
Parents: 9f6a7f9
Author: vishal.garg <vi...@workday.com>
Authored: Mon Jun 22 19:35:08 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 19:39:11 2015 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 128 +++++++
.../ignite/stream/kafka/KafkaStreamer.java | 179 +++++++++
.../stream/kafka/KafkaEmbeddedBroker.java | 373 +++++++++++++++++++
.../kafka/KafkaIgniteStreamerSelfTest.java | 230 ++++++++++++
.../ignite/stream/kafka/SimplePartitioner.java | 46 +++
pom.xml | 1 +
6 files changed, 957 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
new file mode 100644
index 0000000..165ec1c
--- /dev/null
+++ b/modules/kafka/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-kafka</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ <version>4.2</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.gridgain</groupId>
+ <artifactId>ignite-shmem</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>1.8.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
new file mode 100644
index 0000000..e0240ce
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.stream.*;
+
+import kafka.consumer.*;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.*;
+import kafka.serializer.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link
+ * org.apache.ignite.IgniteDataStreamer} instance.
+ * <p>
+ * Uses Kafka's High Level Consumer API to read messages from Kafka
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
+ * Example</a>
+ */
+public class KafkaStreamer<T, K, V>
+ extends StreamAdapter<T, K, V> {
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Executor used to submit kafka streams. */
+ private ExecutorService executor;
+
+ /** Topic. */
+ private String topic;
+
+ /** Number of threads to process kafka streams. */
+ private int threads;
+
+ /** Kafka Consumer Config. */
+ private ConsumerConfig consumerConfig;
+
+ /** Key Decoder. */
+ private Decoder<K> keyDecoder;
+
+ /** Value Decoder. */
+ private Decoder<V> valueDecoder;
+
+ /** Kafka Consumer connector. */
+ private ConsumerConnector consumer;
+
+ /**
+ * Sets the topic.
+ *
+ * @param topic Topic Name.
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Sets the threads.
+ *
+ * @param threads Number of Threads.
+ */
+ public void setThreads(final int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets the consumer config.
+ *
+ * @param consumerConfig Consumer configuration.
+ */
+ public void setConsumerConfig(final ConsumerConfig consumerConfig) {
+ this.consumerConfig = consumerConfig;
+ }
+
+ /**
+ * Sets the key decoder.
+ *
+ * @param keyDecoder Key Decoder.
+ */
+ public void setKeyDecoder(final Decoder<K> keyDecoder) {
+ this.keyDecoder = keyDecoder;
+ }
+
+ /**
+ * Sets the value decoder.
+ *
+ * @param valueDecoder Value Decoder
+ */
+ public void setValueDecoder(final Decoder<V> valueDecoder) {
+ this.valueDecoder = valueDecoder;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.notNull(topic, "topic");
+ A.notNull(keyDecoder, "key decoder");
+ A.notNull(valueDecoder, "value decoder");
+ A.notNull(consumerConfig, "kafka consumer config");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
+
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(topic, threads);
+
+ Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder,
+ valueDecoder);
+
+ List<KafkaStream<K, V>> streams = consumerMap.get(topic);
+
+ // Now launch all the consumer threads.
+ executor = Executors.newFixedThreadPool(threads);
+
+ // Now create an object to consume the messages.
+ for (final KafkaStream<K,V> stream : streams) {
+ executor.submit(new Runnable() {
+ @Override public void run() {
+
+ ConsumerIterator<K, V> it = stream.iterator();
+
+ while (it.hasNext()) {
+ final MessageAndMetadata<K, V> messageAndMetadata = it.next();
+ getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ if (consumer != null)
+ consumer.shutdown();
+
+ if (executor != null) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+ if (log.isDebugEnabled())
+ log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ }
+ catch (InterruptedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Interrupted during shutdown, exiting uncleanly");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
new file mode 100644
index 0000000..28533f7
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.commons.io.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.zookeeper.server.*;
+
+import kafka.admin.*;
+import kafka.api.*;
+import kafka.api.Request;
+import kafka.producer.*;
+import kafka.server.*;
+import kafka.utils.*;
+import org.I0Itec.zkclient.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Kafka Embedded Broker.
+ */
+public class KafkaEmbeddedBroker {
+
+ /** Default ZooKeeper Host. */
+ private static final String ZK_HOST = "localhost";
+
+ /** Broker Port. */
+ private static final int BROKER_PORT = 9092;
+
+ /** ZooKeeper Connection Timeout. */
+ private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+ /** ZooKeeper Session Timeout. */
+ private static final int ZK_SESSION_TIMEOUT = 6000;
+
+ /** ZooKeeper port. */
+ private static int zkPort = 0;
+
+ /** Is ZooKeeper Ready. */
+ private boolean zkReady;
+
+ /** Kafka Config. */
+ private KafkaConfig brokerConfig;
+
+ /** Kafka Server. */
+ private KafkaServer kafkaServer;
+
+ /** ZooKeeper Client. */
+ private ZkClient zkClient;
+
+ /** Embedded ZooKeeper. */
+ private EmbeddedZooKeeper zooKeeper;
+
+ /**
+ * Creates an embedded Kafka Broker.
+ */
+ public KafkaEmbeddedBroker() {
+ try {
+ setupEmbeddedZooKeeper();
+ setupEmbeddedKafkaServer();
+ }
+ catch (IOException | InterruptedException e) {
+ throw new RuntimeException("failed to start Kafka Broker " + e);
+ }
+
+ }
+
+ /**
+ * @return ZooKeeper Address.
+ */
+ public static String getZKAddress() {
+ return ZK_HOST + ":" + zkPort;
+ }
+
+ /**
+ * Creates a Topic.
+ *
+ * @param topic topic name
+ * @param partitions number of paritions for the topic
+ * @param replicationFactor replication factor
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void createTopic(String topic, final int partitions, final int replicationFactor)
+ throws TimeoutException, InterruptedException {
+ AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+ waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
+ }
+
+ /**
+ * Sends message to Kafka Broker.
+ *
+ * @param keyedMessages List of Keyed Messages.
+ * @return Producer used to send the message.
+ */
+ public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ Producer<String, String> producer = new Producer<>(getProducerConfig());
+ producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+ return producer;
+ }
+
+ /**
+ * Shuts down Kafka Broker.
+ *
+ * @throws IOException
+ */
+ public void shutdown()
+ throws IOException {
+
+ zkReady = false;
+
+ if (kafkaServer != null)
+ kafkaServer.shutdown();
+
+ List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs());
+
+ for (String logDir : logDirs) {
+ FileUtils.deleteDirectory(new File(logDir));
+ }
+
+ if (zkClient != null) {
+ zkClient.close();
+ zkClient = null;
+ }
+
+ if (zooKeeper != null) {
+
+ try {
+ zooKeeper.shutdown();
+ }
+ catch (IOException e) {
+ // ignore
+ }
+
+ zooKeeper = null;
+ }
+
+ }
+
+ /**
+ * @return the Zookeeper Client
+ */
+ private ZkClient getZkClient() {
+ A.ensure(zkReady, "Zookeeper not setup yet");
+ A.notNull(zkClient, "Zookeeper client is not yet initialized");
+
+ return zkClient;
+ }
+
+ /**
+ * Checks if topic metadata is propagated.
+ *
+ * @param topic topic name
+ * @param partition partition
+ * @return true if propagated otherwise false
+ */
+ private boolean isMetadataPropagated(final String topic, final int partition) {
+ final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo(
+ topic, partition);
+ if (partitionStateOption.isDefined()) {
+ final PartitionStateInfo partitionState = partitionStateOption.get();
+ final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr();
+
+ if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null
+ && Request.isValidBrokerId(leaderAndInSyncReplicas.leader())
+ && leaderAndInSyncReplicas.isr().size() >= 1)
+ return true;
+
+ }
+ return false;
+ }
+
+ /**
+ * Waits until metadata is propagated.
+ *
+ * @param topic topic name
+ * @param partition partition
+ * @param timeout timeout value in millis
+ * @param interval interval in millis to sleep
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout,
+ final long interval) throws TimeoutException, InterruptedException {
+ int attempt = 1;
+ final long startTime = System.currentTimeMillis();
+
+ while (true) {
+ if (isMetadataPropagated(topic, partition))
+ return;
+
+ final long duration = System.currentTimeMillis() - startTime;
+
+ if (duration < timeout)
+ Thread.sleep(interval);
+ else
+ throw new TimeoutException("metadata propagate timed out, attempt=" + attempt);
+
+ attempt++;
+ }
+
+ }
+
+ /**
+ * Sets up embedded Kafka Server
+ *
+ * @throws IOException
+ */
+ private void setupEmbeddedKafkaServer()
+ throws IOException {
+ A.ensure(zkReady, "Zookeeper should be setup before hand");
+
+ brokerConfig = new KafkaConfig(getBrokerConfig());
+ kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$);
+ kafkaServer.startup();
+ }
+
+ /**
+ * Sets up embedded zooKeeper
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void setupEmbeddedZooKeeper()
+ throws IOException, InterruptedException {
+ EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
+ zooKeeper.startup();
+ zkPort = zooKeeper.getActualPort();
+ zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
+ zkReady = true;
+ }
+
+ /**
+ * @return Kafka Broker Address.
+ */
+ private static String getBrokerAddress() {
+ return ZK_HOST + ":" + BROKER_PORT;
+ }
+
+ /**
+ * Gets KafKa Brofer Config
+ *
+ * @return Kafka Broker Config
+ * @throws IOException
+ */
+ private static Properties getBrokerConfig()
+ throws IOException {
+ Properties props = new Properties();
+ props.put("broker.id", "0");
+ props.put("host.name", ZK_HOST);
+ props.put("port", "" + BROKER_PORT);
+ props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
+ props.put("zookeeper.connect", getZKAddress());
+ props.put("log.flush.interval.messages", "1");
+ props.put("replica.socket.timeout.ms", "1500");
+ return props;
+ }
+
+ /**
+ * @return Kafka Producer Config
+ */
+ private static ProducerConfig getProducerConfig() {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", getBrokerAddress());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("key.serializer.class", "kafka.serializer.StringEncoder");
+ props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner");
+ return new ProducerConfig(props);
+ }
+
+ /**
+ * Creates Temp Directory
+ *
+ * @param prefix prefix
+ * @return Created File.
+ * @throws IOException
+ */
+ private static File createTempDir(final String prefix)
+ throws IOException {
+ final Path path = Files.createTempDirectory(prefix);
+ return path.toFile();
+
+ }
+
+ /**
+ * Creates Embedded ZooKeeper.
+ */
+ private static class EmbeddedZooKeeper {
+ /** Default ZooKeeper Host. */
+ private final String zkHost;
+
+ /** Default ZooKeeper Port. */
+ private final int zkPort;
+
+ /** NIO Context Factory. */
+ private NIOServerCnxnFactory factory;
+
+ /** Snapshot Directory. */
+ private File snapshotDir;
+
+ /** Log Directory. */
+ private File logDir;
+
+ /**
+ * Creates an embedded Zookeeper
+ * @param zkHost zookeeper host
+ * @param zkPort zookeeper port
+ */
+ EmbeddedZooKeeper(final String zkHost, final int zkPort) {
+ this.zkHost = zkHost;
+ this.zkPort = zkPort;
+ }
+
+ /**
+ * Starts up ZooKeeper.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void startup()
+ throws IOException, InterruptedException {
+ snapshotDir = createTempDir("_ss");
+ logDir = createTempDir("_log");
+ ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500);
+ factory = new NIOServerCnxnFactory();
+ factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
+ factory.startup(zooServer);
+ }
+
+ /**
+ *
+ * @return actual port zookeeper is started
+ */
+ int getActualPort() {
+ return factory.getLocalPort();
+ }
+
+ /**
+ * Shuts down ZooKeeper.
+ *
+ * @throws IOException
+ */
+ void shutdown()
+ throws IOException {
+ if (factory != null) {
+ factory.shutdown();
+
+ U.delete(snapshotDir);
+ U.delete(logDir);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
new file mode 100644
index 0000000..5972639
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import kafka.consumer.*;
+import kafka.producer.*;
+import kafka.serializer.*;
+import kafka.utils.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link KafkaStreamer}.
+ */
+public class KafkaIgniteStreamerSelfTest
+ extends GridCommonAbstractTest {
+ /** Embedded Kafka. */
+ private KafkaEmbeddedBroker embeddedBroker;
+
+ /** Count. */
+ private static final int CNT = 100;
+
+ /** Test Topic. */
+ private static final String TOPIC_NAME = "page_visits";
+
+ /** Kafka Partition. */
+ private static final int PARTITIONS = 4;
+
+ /** Kafka Replication Factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Topic Message Key Prefix. */
+ private static final String KEY_PREFIX = "192.168.2.";
+
+ /** Topic Message Value Url. */
+ private static final String VALUE_URL = ",www.example.com,";
+
+ /** Constructor. */
+ public KafkaIgniteStreamerSelfTest() {
+ super(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void beforeTest()
+ throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ embeddedBroker = new KafkaEmbeddedBroker();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void afterTest()
+ throws Exception {
+ grid().cache(null).clear();
+
+ embeddedBroker.shutdown();
+ }
+
+ /**
+ * Tests Kafka streamer.
+ *
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void testKafkaStreamer()
+ throws TimeoutException, InterruptedException {
+ embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
+
+ Map<String, String> keyValueMap = produceStream(TOPIC_NAME);
+ consumerStream(TOPIC_NAME, keyValueMap);
+ }
+
+ /**
+ * Produces/Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(final String topic) {
+ final Map<String, String> keyValueMap = new HashMap<>();
+
+ // Generate random subnets.
+ List<Integer> subnet = new ArrayList<>();
+
+ int i = 0;
+ while (i <= CNT)
+ subnet.add(++i);
+
+ Collections.shuffle(subnet);
+
+ final List<KeyedMessage<String, String>> messages = new ArrayList<>();
+ for (int event = 0; event < CNT; event++) {
+ long runtime = new Date().getTime();
+ String ip = KEY_PREFIX + subnet.get(event);
+ String msg = runtime + VALUE_URL + ip;
+ messages.add(new KeyedMessage<>(topic, ip, msg));
+ keyValueMap.put(ip, msg);
+ }
+
+ final Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+ producer.close();
+
+ return keyValueMap;
+ }
+
+ /**
+ * Consumes Kafka Stream via ignite.
+ *
+ * @param topic Topic name.
+ * @param keyValueMap Expected key value map.
+ * @throws TimeoutException TimeoutException.
+ * @throws InterruptedException InterruptedException.
+ */
+ private void consumerStream(final String topic, final Map<String, String> keyValueMap)
+ throws TimeoutException, InterruptedException {
+
+ KafkaStreamer<String, String, String> kafkaStmr = null;
+
+ final Ignite ignite = grid();
+ try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
+
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ // Configure socket streamer.
+ kafkaStmr = new KafkaStreamer<>();
+
+ // Get the cache.
+ IgniteCache<String, String> cache = ignite.cache(null);
+
+ // Set ignite instance.
+ kafkaStmr.setIgnite(ignite);
+
+ // Set data streamer instance.
+ kafkaStmr.setStreamer(stmr);
+
+ // Set the topic.
+ kafkaStmr.setTopic(topic);
+
+ // Set the number of threads.
+ kafkaStmr.setThreads(4);
+
+ // Set the consumer configuration.
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(),
+ "groupX"));
+
+ // Set the decoders.
+ StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties());
+ kafkaStmr.setKeyDecoder(stringDecoder);
+ kafkaStmr.setValueDecoder(stringDecoder);
+
+ // Start kafka streamer.
+ kafkaStmr.start();
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override
+ public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+ latch.await();
+
+ for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+
+ final String cacheValue = cache.get(key);
+ assertEquals(value, cacheValue);
+ }
+ }
+
+ finally {
+ // Shutdown kafka streamer.
+ kafkaStmr.stop();
+ }
+ }
+
+ /**
+ * Creates default consumer config.
+ *
+ * @param zooKeeper Zookeeper address <server:port>.
+ * @param groupId Group Id for kafka subscriber.
+ * @return {@link ConsumerConfig} kafka consumer configuration.
+ */
+ private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) {
+ A.notNull(zooKeeper, "zookeeper");
+ A.notNull(groupId, "groupId");
+
+ Properties props = new Properties();
+ props.put("zookeeper.connect", zooKeeper);
+ props.put("group.id", groupId);
+ props.put("zookeeper.session.timeout.ms", "400");
+ props.put("zookeeper.sync.time.ms", "200");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.offset.reset", "smallest");
+
+ return new ConsumerConfig(props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
new file mode 100644
index 0000000..b836b44
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import kafka.producer.*;
+
+/**
+ * Simple Partitioner for Kafka.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class SimplePartitioner
+ implements Partitioner {
+
+ /**
+ * Partitions the key based on the key value.
+ *
+ * @param key Key.
+ * @param partitionSize Partition size.
+ * @return partition Partition.
+ */
+ public int partition(Object key, int partitionSize) {
+ int partition = 0;
+ String keyStr = (String)key;
+ String[] keyValues = keyStr.split("\\.");
+ Integer intKey = Integer.parseInt(keyValues[3]);
+ if (intKey > 0) {
+ partition = intKey % partitionSize;
+ }
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6d1609..b47d34b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<module>modules/gce</module>
<module>modules/cloud</module>
<module>modules/mesos</module>
+ <module>modules/kafka</module>
</modules>
<profiles>