You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/07/06 13:02:52 UTC
[01/14] incubator-ignite git commit: # ignite_1055
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-929 97f0f3678 -> 7bef55b93
# ignite_1055
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7030ee19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7030ee19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7030ee19
Branch: refs/heads/ignite-929
Commit: 7030ee191f9e229c73a85cb48e01c8255a53a0cc
Parents: b437ec7
Author: Atri <at...@gmail.com>
Authored: Mon Jun 29 21:44:01 2015 +0530
Committer: ashutak <as...@gridgain.com>
Committed: Thu Jul 2 14:05:50 2015 +0300
----------------------------------------------------------------------
bin/ignite.bat | 8 ++++++--
bin/ignite.sh | 6 +++++-
bin/include/parseargs.bat | 1 +
bin/include/parseargs.sh | 3 +++
.../ignite/startup/cmdline/CommandLineTransformer.java | 9 +++++++++
5 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7030ee19/bin/ignite.bat
----------------------------------------------------------------------
diff --git a/bin/ignite.bat b/bin/ignite.bat
index 1a4a58c..43cc58b 100644
--- a/bin/ignite.bat
+++ b/bin/ignite.bat
@@ -129,8 +129,12 @@ set RESTART_SUCCESS_OPT=-DIGNITE_SUCCESS_FILE=%RESTART_SUCCESS_FILE%
::
:: You can specify IGNITE_JMX_PORT environment variable for overriding automatically found JMX port
::
-for /F "tokens=*" %%A in ('""%JAVA_HOME%\bin\java" -cp %CP% org.apache.ignite.internal.util.portscanner.GridJmxPortFinder"') do (
- set JMX_PORT=%%A
+:: This is executed if -nojmx is not specified
+::
+if not "%NO_JMX%" == "1" (
+ for /F "tokens=*" %%A in ('""%JAVA_HOME%\bin\java" -cp %CP% org.apache.ignite.internal.util.portscanner.GridJmxPortFinder"') do (
+ set JMX_PORT=%%A
+ )
)
::
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7030ee19/bin/ignite.sh
----------------------------------------------------------------------
diff --git a/bin/ignite.sh b/bin/ignite.sh
index 660a80f..7fbca57 100755
--- a/bin/ignite.sh
+++ b/bin/ignite.sh
@@ -71,7 +71,11 @@ RESTART_SUCCESS_OPT="-DIGNITE_SUCCESS_FILE=${RESTART_SUCCESS_FILE}"
#
# You can specify IGNITE_JMX_PORT environment variable for overriding automatically found JMX port
#
-findAvailableJmxPort
+# This is executed when -nojmx is not specified
+#
+if [ "${NOJMX}" == "0" ] ; then
+ findAvailableJmxPort
+fi
# Mac OS specific support to display correct name in the dock.
osname=`uname`
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7030ee19/bin/include/parseargs.bat
----------------------------------------------------------------------
diff --git a/bin/include/parseargs.bat b/bin/include/parseargs.bat
index e8c764a..c4b35d2 100644
--- a/bin/include/parseargs.bat
+++ b/bin/include/parseargs.bat
@@ -24,6 +24,7 @@
:: QUIET
:: NO_PAUSE
:: JVM_XOPTS
+:: NOJMX
::
:: Script setups reasonable defaults (see below) for omitted arguments.
::
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7030ee19/bin/include/parseargs.sh
----------------------------------------------------------------------
diff --git a/bin/include/parseargs.sh b/bin/include/parseargs.sh
index 8045348..3ab255e 100755
--- a/bin/include/parseargs.sh
+++ b/bin/include/parseargs.sh
@@ -22,6 +22,7 @@
# INTERACTIVE
# QUIET
# JVM_XOPTS
+# NOJMX
#
# Script setups reasonable defaults (see below) for omitted arguments.
#
@@ -35,6 +36,7 @@
CONFIG=${DEFAULT_CONFIG}
INTERACTIVE="0"
+NOJMX="0"
QUIET="-DIGNITE_QUIET=true"
JVM_XOPTS=""
@@ -42,6 +44,7 @@ while [ $# -gt 0 ]
do
case "$1" in
-i) INTERACTIVE="1";;
+ -nojmx) NOJMX="1";;
-v) QUIET="-DIGNITE_QUIET=false";;
-J*) JVM_XOPTS="$JVM_XOPTS ${1:2}";;
*) CONFIG="$1";;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7030ee19/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
index 6b2c809..ae0c587 100644
--- a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
+++ b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
@@ -56,6 +56,9 @@ public class CommandLineTransformer {
/** No pause mode. */
private boolean noPause;
+ /** No JMX mode. */
+ private boolean noJMX;
+
/** Supported parameter, parsed manually. */
private String jvmOptions = "";
@@ -132,6 +135,11 @@ public class CommandLineTransformer {
break;
+ case "-nojmx":
+ noJMX = true;
+
+ break;
+
default:
argsList.add(arg);
}
@@ -152,6 +160,7 @@ public class CommandLineTransformer {
addArgWithValue(sb, "INTERACTIVE", formatBooleanValue(interactive));
addArgWithValue(sb, "QUIET", "-DIGNITE_QUIET=" + !verbose);
addArgWithValue(sb, "NO_PAUSE", formatBooleanValue(noPause));
+ addArgWithValue(sb, "NO_JMX", formatBooleanValue(noJMX));
parseJvmOptionsAndSpringConfig(args);
[14/14] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-7' into ignite-929
Posted by av...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-7' into ignite-929
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7bef55b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7bef55b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7bef55b9
Branch: refs/heads/ignite-929
Commit: 7bef55b939c09d7bc3a511a9dfbba62bb1dd2ae0
Parents: 97f0f36 8fc9e4e
Author: Anton <av...@gridgain.com>
Authored: Mon Jul 6 14:02:35 2015 +0300
Committer: Anton <av...@gridgain.com>
Committed: Mon Jul 6 14:02:35 2015 +0300
----------------------------------------------------------------------
bin/ignite.bat | 8 +-
bin/ignite.sh | 6 +-
bin/include/parseargs.bat | 1 +
bin/include/parseargs.sh | 3 +
.../internal/interop/InteropIgnition.java | 31 +-
.../ignite/internal/util/IgniteUtils.java | 6 +-
.../startup/cmdline/CommandLineStartup.java | 3 +-
.../startup/cmdline/CommandLineTransformer.java | 9 +
...acheAtomicReplicatedNodeRestartSelfTest.java | 10 +
.../TcpDiscoveryNodeConsistentIdSelfTest.java | 80 ++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
modules/kafka/pom.xml | 116 ++++++
.../ignite/stream/kafka/KafkaStreamer.java | 220 +++++++++++
.../kafka/IgniteKafkaStreamerSelfTestSuite.java | 37 ++
.../stream/kafka/KafkaEmbeddedBroker.java | 378 +++++++++++++++++++
.../kafka/KafkaIgniteStreamerSelfTest.java | 227 +++++++++++
.../ignite/stream/kafka/SimplePartitioner.java | 53 +++
pom.xml | 1 +
18 files changed, 1175 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[09/14] incubator-ignite git commit: disable failing test
Posted by av...@apache.org.
disable failing test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1342ba1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1342ba1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1342ba1
Branch: refs/heads/ignite-929
Commit: b1342ba19078bd8f742d4d1d605bd9ccc19c82b5
Parents: 334fee9
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Jul 6 10:30:23 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Mon Jul 6 10:30:23 2015 +0300
----------------------------------------------------------------------
.../IgniteCacheAtomicReplicatedNodeRestartSelfTest.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1342ba1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
index 52d8871..c802d7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
@@ -27,7 +27,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-747");
+ //fail("https://issues.apache.org/jira/browse/IGNITE-747");
}
/** {@inheritDoc} */
@@ -36,6 +36,11 @@ public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheRep
}
/** {@inheritDoc} */
+ @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1095");
+ }
+
+ /** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return ATOMIC;
}
[05/14] incubator-ignite git commit: ignite-428 Review fixes
Posted by av...@apache.org.
ignite-428 Review fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/63fce5a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/63fce5a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/63fce5a4
Branch: refs/heads/ignite-929
Commit: 63fce5a4920531ad8cd9afc47d829bb2fa4bc438
Parents: 2c41739
Author: agura <ag...@gridgain.com>
Authored: Thu Jun 25 22:18:00 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 19:39:12 2015 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 18 +-
.../ignite/stream/kafka/KafkaStreamer.java | 121 ++++++---
.../stream/kafka/KafkaEmbeddedBroker.java | 251 ++++++++++---------
.../kafka/KafkaIgniteStreamerSelfTest.java | 131 +++++-----
.../ignite/stream/kafka/SimplePartitioner.java | 27 +-
5 files changed, 293 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index 165ec1c..43909bc 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -31,7 +31,7 @@
</parent>
<artifactId>ignite-kafka</artifactId>
- <version>1.1.1-SNAPSHOT</version>
+ <version>1.1.6-SNAPSHOT</version>
<dependencies>
<dependency>
@@ -39,6 +39,7 @@
<artifactId>ignite-core</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
@@ -66,6 +67,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -73,12 +75,6 @@
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.4</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${project.version}</version>
@@ -90,7 +86,6 @@
<version>4.2</version>
</dependency>
-
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -103,13 +98,6 @@
</dependency>
<dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.8.3</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index e0240ce..e9ad0bd 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -30,16 +30,17 @@ import java.util.*;
import java.util.concurrent.*;
/**
- * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link
- * org.apache.ignite.IgniteDataStreamer} instance.
+ * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into
+ * {@link IgniteDataStreamer} instance.
* <p>
- * Uses Kafka's High Level Consumer API to read messages from Kafka
+ * Uses Kafka's High Level Consumer API to read messages from Kafka.
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
* Example</a>
*/
-public class KafkaStreamer<T, K, V>
- extends StreamAdapter<T, K, V> {
+public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Retry timeout. */
+ private static final long DFLT_RETRY_TIMEOUT = 10000;
/** Logger. */
private IgniteLogger log;
@@ -53,61 +54,78 @@ public class KafkaStreamer<T, K, V>
/** Number of threads to process kafka streams. */
private int threads;
- /** Kafka Consumer Config. */
- private ConsumerConfig consumerConfig;
+ /** Kafka consumer config. */
+ private ConsumerConfig consumerCfg;
- /** Key Decoder. */
+ /** Key decoder. */
private Decoder<K> keyDecoder;
- /** Value Decoder. */
- private Decoder<V> valueDecoder;
+ /** Value decoder. */
+ private Decoder<V> valDecoder;
- /** Kafka Consumer connector. */
+ /** Kafka consumer connector. */
private ConsumerConnector consumer;
+ /** Retry timeout. */
+ private long retryTimeout = DFLT_RETRY_TIMEOUT;
+
+ /** Stopped. */
+ private volatile boolean stopped;
+
/**
- * Sets the topic.
+ * Sets the topic name.
*
- * @param topic Topic Name.
+ * @param topic Topic name.
*/
- public void setTopic(final String topic) {
+ public void setTopic(String topic) {
this.topic = topic;
}
/**
* Sets the threads.
*
- * @param threads Number of Threads.
+ * @param threads Number of threads.
*/
- public void setThreads(final int threads) {
+ public void setThreads(int threads) {
this.threads = threads;
}
/**
* Sets the consumer config.
*
- * @param consumerConfig Consumer configuration.
+ * @param consumerCfg Consumer configuration.
*/
- public void setConsumerConfig(final ConsumerConfig consumerConfig) {
- this.consumerConfig = consumerConfig;
+ public void setConsumerConfig(ConsumerConfig consumerCfg) {
+ this.consumerCfg = consumerCfg;
}
/**
* Sets the key decoder.
*
- * @param keyDecoder Key Decoder.
+ * @param keyDecoder Key decoder.
*/
- public void setKeyDecoder(final Decoder<K> keyDecoder) {
+ public void setKeyDecoder(Decoder<K> keyDecoder) {
this.keyDecoder = keyDecoder;
}
/**
* Sets the value decoder.
*
- * @param valueDecoder Value Decoder
+ * @param valDecoder Value decoder.
*/
- public void setValueDecoder(final Decoder<V> valueDecoder) {
- this.valueDecoder = valueDecoder;
+ public void setValueDecoder(Decoder<V> valDecoder) {
+ this.valDecoder = valDecoder;
+ }
+
+ /**
+ * Sets the retry timeout.
+ *
+ * @param retryTimeout Retry timeout.
+ */
+ public void setRetryTimeout(long retryTimeout) {
+ A.ensure(retryTimeout > 0, "retryTimeout > 0");
+
+ this.retryTimeout = retryTimeout;
}
/**
@@ -120,35 +138,56 @@ public class KafkaStreamer<T, K, V>
A.notNull(getIgnite(), "ignite");
A.notNull(topic, "topic");
A.notNull(keyDecoder, "key decoder");
- A.notNull(valueDecoder, "value decoder");
- A.notNull(consumerConfig, "kafka consumer config");
+ A.notNull(valDecoder, "value decoder");
+ A.notNull(consumerCfg, "kafka consumer config");
A.ensure(threads > 0, "threads > 0");
log = getIgnite().log();
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg);
+
+ Map<String, Integer> topicCntMap = new HashMap<>();
- Map<String, Integer> topicCountMap = new HashMap<>();
- topicCountMap.put(topic, threads);
+ topicCntMap.put(topic, threads);
- Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder,
- valueDecoder);
+ Map<String, List<KafkaStream<K, V>>> consumerMap =
+ consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder);
List<KafkaStream<K, V>> streams = consumerMap.get(topic);
// Now launch all the consumer threads.
executor = Executors.newFixedThreadPool(threads);
+ stopped = false;
+
// Now create an object to consume the messages.
- for (final KafkaStream<K,V> stream : streams) {
+ for (final KafkaStream<K, V> stream : streams) {
executor.submit(new Runnable() {
@Override public void run() {
-
- ConsumerIterator<K, V> it = stream.iterator();
-
- while (it.hasNext()) {
- final MessageAndMetadata<K, V> messageAndMetadata = it.next();
- getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+ while (!stopped) {
+ try {
+ for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
+ MessageAndMetadata<K, V> msg = it.next();
+
+ try {
+ getStreamer().addData(msg.key(), msg.message());
+ }
+ catch (Exception e) {
+ U.warn(log, "Message is ignored due to an error [msg=" + msg + ']', e);
+ }
+ }
+ }
+ catch (Exception e) {
+ U.warn(log, "Message can't be consumed from stream. Retry after " +
+ retryTimeout + " ms.", e);
+
+ try {
+ Thread.sleep(retryTimeout);
+ }
+ catch (InterruptedException ie) {
+ // No-op.
+ }
+ }
}
}
});
@@ -159,6 +198,8 @@ public class KafkaStreamer<T, K, V>
* Stops streamer.
*/
public void stop() {
+ stopped = true;
+
if (consumer != null)
consumer.shutdown();
@@ -168,11 +209,11 @@ public class KafkaStreamer<T, K, V>
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
if (log.isDebugEnabled())
- log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
}
catch (InterruptedException e) {
if (log.isDebugEnabled())
- log.debug("Interrupted during shutdown, exiting uncleanly");
+ log.debug("Interrupted during shutdown, exiting uncleanly.");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
index 28533f7..98b9e4c 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
@@ -17,7 +17,6 @@
package org.apache.ignite.stream.kafka;
-import org.apache.commons.io.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.zookeeper.server.*;
@@ -25,6 +24,7 @@ import kafka.admin.*;
import kafka.api.*;
import kafka.api.Request;
import kafka.producer.*;
+import kafka.serializer.*;
import kafka.server.*;
import kafka.utils.*;
import org.I0Itec.zkclient.*;
@@ -39,106 +39,104 @@ import java.util.concurrent.*;
* Kafka Embedded Broker.
*/
public class KafkaEmbeddedBroker {
-
- /** Default ZooKeeper Host. */
+ /** Default ZooKeeper host. */
private static final String ZK_HOST = "localhost";
- /** Broker Port. */
+ /** Broker port. */
private static final int BROKER_PORT = 9092;
- /** ZooKeeper Connection Timeout. */
+ /** ZooKeeper connection timeout. */
private static final int ZK_CONNECTION_TIMEOUT = 6000;
- /** ZooKeeper Session Timeout. */
+ /** ZooKeeper session timeout. */
private static final int ZK_SESSION_TIMEOUT = 6000;
/** ZooKeeper port. */
private static int zkPort = 0;
- /** Is ZooKeeper Ready. */
+ /** Is ZooKeeper ready. */
private boolean zkReady;
- /** Kafka Config. */
- private KafkaConfig brokerConfig;
+ /** Kafka config. */
+ private KafkaConfig brokerCfg;
- /** Kafka Server. */
- private KafkaServer kafkaServer;
+ /** Kafka server. */
+ private KafkaServer kafkaSrv;
- /** ZooKeeper Client. */
+ /** ZooKeeper client. */
private ZkClient zkClient;
/** Embedded ZooKeeper. */
private EmbeddedZooKeeper zooKeeper;
/**
- * Creates an embedded Kafka Broker.
+ * Creates an embedded Kafka broker.
*/
public KafkaEmbeddedBroker() {
try {
setupEmbeddedZooKeeper();
+
setupEmbeddedKafkaServer();
}
catch (IOException | InterruptedException e) {
- throw new RuntimeException("failed to start Kafka Broker " + e);
+ throw new RuntimeException("Failed to start Kafka broker " + e);
}
-
}
/**
- * @return ZooKeeper Address.
+ * @return ZooKeeper address.
*/
public static String getZKAddress() {
- return ZK_HOST + ":" + zkPort;
+ return ZK_HOST + ':' + zkPort;
}
/**
* Creates a Topic.
*
- * @param topic topic name
- * @param partitions number of paritions for the topic
- * @param replicationFactor replication factor
- * @throws TimeoutException
- * @throws InterruptedException
+ * @param topic Topic name.
+ * @param partitions Number of partitions for the topic.
+ * @param replicationFactor Replication factor.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
*/
- public void createTopic(String topic, final int partitions, final int replicationFactor)
+ public void createTopic(String topic, int partitions, int replicationFactor)
throws TimeoutException, InterruptedException {
AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+
waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
}
/**
- * Sends message to Kafka Broker.
+ * Sends message to Kafka broker.
*
- * @param keyedMessages List of Keyed Messages.
+ * @param keyedMessages List of keyed messages.
* @return Producer used to send the message.
*/
- public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
Producer<String, String> producer = new Producer<>(getProducerConfig());
+
producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
return producer;
}
/**
- * Shuts down Kafka Broker.
- *
- * @throws IOException
+ * Shuts down Kafka broker.
*/
- public void shutdown()
- throws IOException {
-
+ public void shutdown() {
zkReady = false;
- if (kafkaServer != null)
- kafkaServer.shutdown();
+ if (kafkaSrv != null)
+ kafkaSrv.shutdown();
- List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs());
+ List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
- for (String logDir : logDirs) {
- FileUtils.deleteDirectory(new File(logDir));
- }
+ for (String logDir : logDirs)
+ U.delete(new File(logDir));
if (zkClient != null) {
zkClient.close();
+
zkClient = null;
}
@@ -148,16 +146,15 @@ public class KafkaEmbeddedBroker {
zooKeeper.shutdown();
}
catch (IOException e) {
- // ignore
+ // No-op.
}
zooKeeper = null;
}
-
}
/**
- * @return the Zookeeper Client
+ * @return ZooKeeper client.
*/
private ZkClient getZkClient() {
A.ensure(zkReady, "Zookeeper not setup yet");
@@ -169,102 +166,105 @@ public class KafkaEmbeddedBroker {
/**
* Checks if topic metadata is propagated.
*
- * @param topic topic name
- * @param partition partition
- * @return true if propagated otherwise false
+ * @param topic Topic name.
+ * @param part Partition.
+ * @return {@code True} if propagated, otherwise {@code false}.
*/
- private boolean isMetadataPropagated(final String topic, final int partition) {
- final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo(
- topic, partition);
- if (partitionStateOption.isDefined()) {
- final PartitionStateInfo partitionState = partitionStateOption.get();
- final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
- if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null
- && Request.isValidBrokerId(leaderAndInSyncReplicas.leader())
- && leaderAndInSyncReplicas.isr().size() >= 1)
- return true;
+ private boolean isMetadataPropagated(String topic, int part) {
+ scala.Option<PartitionStateInfo> partStateOption =
+ kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
- }
- return false;
+ if (!partStateOption.isDefined())
+ return false;
+
+ PartitionStateInfo partState = partStateOption.get();
+
+ LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
+
+ return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
+ Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
}
/**
* Waits until metadata is propagated.
*
- * @param topic topic name
- * @param partition partition
- * @param timeout timeout value in millis
- * @param interval interval in millis to sleep
- * @throws TimeoutException
- * @throws InterruptedException
+ * @param topic Topic name.
+ * @param part Partition.
+ * @param timeout Timeout value in millis.
+ * @param interval Interval in millis to sleep.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
*/
- private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout,
- final long interval) throws TimeoutException, InterruptedException {
+ private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
+ throws TimeoutException, InterruptedException {
int attempt = 1;
- final long startTime = System.currentTimeMillis();
+
+ long startTime = System.currentTimeMillis();
while (true) {
- if (isMetadataPropagated(topic, partition))
+ if (isMetadataPropagated(topic, part))
return;
- final long duration = System.currentTimeMillis() - startTime;
+ long duration = System.currentTimeMillis() - startTime;
if (duration < timeout)
Thread.sleep(interval);
else
- throw new TimeoutException("metadata propagate timed out, attempt=" + attempt);
+ throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
attempt++;
}
-
}
/**
- * Sets up embedded Kafka Server
+ * Sets up embedded Kafka server.
*
- * @throws IOException
+ * @throws IOException If failed.
*/
- private void setupEmbeddedKafkaServer()
- throws IOException {
+ private void setupEmbeddedKafkaServer() throws IOException {
A.ensure(zkReady, "Zookeeper should be setup before hand");
- brokerConfig = new KafkaConfig(getBrokerConfig());
- kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$);
- kafkaServer.startup();
+ brokerCfg = new KafkaConfig(getBrokerConfig());
+
+ kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
+
+ kafkaSrv.startup();
}
/**
- * Sets up embedded zooKeeper
+ * Sets up embedded ZooKeeper.
*
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException If failed.
+ * @throws InterruptedException If interrupted.
*/
- private void setupEmbeddedZooKeeper()
- throws IOException, InterruptedException {
+ private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
+
zooKeeper.startup();
+
zkPort = zooKeeper.getActualPort();
+
zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
+
zkReady = true;
}
/**
- * @return Kafka Broker Address.
+ * @return Kafka broker address.
*/
private static String getBrokerAddress() {
- return ZK_HOST + ":" + BROKER_PORT;
+ return ZK_HOST + ':' + BROKER_PORT;
}
/**
- * Gets KafKa Brofer Config
+ * Gets Kafka broker config.
*
- * @return Kafka Broker Config
- * @throws IOException
+ * @return Kafka broker config.
+ * @throws IOException If failed.
*/
- private static Properties getBrokerConfig()
- throws IOException {
+ private static Properties getBrokerConfig() throws IOException {
Properties props = new Properties();
+
props.put("broker.id", "0");
props.put("host.name", ZK_HOST);
props.put("port", "" + BROKER_PORT);
@@ -272,60 +272,63 @@ public class KafkaEmbeddedBroker {
props.put("zookeeper.connect", getZKAddress());
props.put("log.flush.interval.messages", "1");
props.put("replica.socket.timeout.ms", "1500");
+
return props;
}
/**
- * @return Kafka Producer Config
+ * @return Kafka Producer config.
*/
private static ProducerConfig getProducerConfig() {
Properties props = new Properties();
+
props.put("metadata.broker.list", getBrokerAddress());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("key.serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner");
+ props.put("serializer.class", StringEncoder.class.getName());
+ props.put("key.serializer.class", StringEncoder.class.getName());
+ props.put("partitioner.class", SimplePartitioner.class.getName());
+
return new ProducerConfig(props);
}
/**
- * Creates Temp Directory
+ * Creates temp directory.
*
- * @param prefix prefix
- * @return Created File.
- * @throws IOException
+ * @param prefix Prefix.
+ * @return Created file.
+ * @throws IOException If failed.
*/
- private static File createTempDir(final String prefix)
- throws IOException {
- final Path path = Files.createTempDirectory(prefix);
- return path.toFile();
+ private static File createTempDir( String prefix) throws IOException {
+ Path path = Files.createTempDirectory(prefix);
+ return path.toFile();
}
/**
- * Creates Embedded ZooKeeper.
+ * Creates embedded ZooKeeper.
*/
private static class EmbeddedZooKeeper {
- /** Default ZooKeeper Host. */
+ /** Default ZooKeeper host. */
private final String zkHost;
- /** Default ZooKeeper Port. */
+ /** Default ZooKeeper port. */
private final int zkPort;
- /** NIO Context Factory. */
+ /** NIO context factory. */
private NIOServerCnxnFactory factory;
- /** Snapshot Directory. */
+ /** Snapshot directory. */
private File snapshotDir;
- /** Log Directory. */
+ /** Log directory. */
private File logDir;
/**
- * Creates an embedded Zookeeper
- * @param zkHost zookeeper host
- * @param zkPort zookeeper port
+ * Creates an embedded ZooKeeper.
+ *
+ * @param zkHost ZooKeeper host.
+ * @param zkPort ZooKeeper port.
*/
- EmbeddedZooKeeper(final String zkHost, final int zkPort) {
+ EmbeddedZooKeeper(String zkHost, int zkPort) {
this.zkHost = zkHost;
this.zkPort = zkPort;
}
@@ -333,22 +336,25 @@ public class KafkaEmbeddedBroker {
/**
* Starts up ZooKeeper.
*
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException If failed.
+ * @throws InterruptedException If interrupted.
*/
- void startup()
- throws IOException, InterruptedException {
+ void startup() throws IOException, InterruptedException {
snapshotDir = createTempDir("_ss");
+
logDir = createTempDir("_log");
- ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500);
+
+ ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
+
factory = new NIOServerCnxnFactory();
+
factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
- factory.startup(zooServer);
+
+ factory.startup(zkSrv);
}
/**
- *
- * @return actual port zookeeper is started
+ * @return Actual port ZooKeeper is started.
*/
int getActualPort() {
return factory.getLocalPort();
@@ -357,17 +363,16 @@ public class KafkaEmbeddedBroker {
/**
* Shuts down ZooKeeper.
*
- * @throws IOException
+ * @throws IOException If failed.
*/
- void shutdown()
- throws IOException {
+ void shutdown() throws IOException {
if (factory != null) {
factory.shutdown();
U.delete(snapshotDir);
+
U.delete(logDir);
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 5972639..2473990 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -32,31 +32,31 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.*;
/**
* Tests {@link KafkaStreamer}.
*/
-public class KafkaIgniteStreamerSelfTest
- extends GridCommonAbstractTest {
+public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
/** Embedded Kafka. */
private KafkaEmbeddedBroker embeddedBroker;
/** Count. */
private static final int CNT = 100;
- /** Test Topic. */
+ /** Test topic. */
private static final String TOPIC_NAME = "page_visits";
- /** Kafka Partition. */
+ /** Kafka partition. */
private static final int PARTITIONS = 4;
- /** Kafka Replication Factor. */
+ /** Kafka replication factor. */
private static final int REPLICATION_FACTOR = 1;
- /** Topic Message Key Prefix. */
+ /** Topic message key prefix. */
private static final String KEY_PREFIX = "192.168.2.";
- /** Topic Message Value Url. */
+ /** Topic message value URL. */
private static final String VALUE_URL = ",www.example.com,";
/** Constructor. */
@@ -65,18 +65,15 @@ public class KafkaIgniteStreamerSelfTest
}
/** {@inheritDoc} */
- @Override
- protected void beforeTest()
- throws Exception {
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
embeddedBroker = new KafkaEmbeddedBroker();
}
/** {@inheritDoc} */
- @Override
- protected void afterTest()
- throws Exception {
+ @Override protected void afterTest() throws Exception {
grid().cache(null).clear();
embeddedBroker.shutdown();
@@ -85,76 +82,80 @@ public class KafkaIgniteStreamerSelfTest
/**
* Tests Kafka streamer.
*
- * @throws TimeoutException
- * @throws InterruptedException
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
*/
- public void testKafkaStreamer()
- throws TimeoutException, InterruptedException {
+ public void testKafkaStreamer() throws TimeoutException, InterruptedException {
embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
- Map<String, String> keyValueMap = produceStream(TOPIC_NAME);
- consumerStream(TOPIC_NAME, keyValueMap);
+ Map<String, String> keyValMap = produceStream(TOPIC_NAME);
+
+ consumerStream(TOPIC_NAME, keyValMap);
}
/**
- * Produces/Sends messages to Kafka.
+ * Sends messages to Kafka.
*
* @param topic Topic name.
* @return Map of key value messages.
*/
- private Map<String, String> produceStream(final String topic) {
- final Map<String, String> keyValueMap = new HashMap<>();
-
+ private Map<String, String> produceStream(String topic) {
// Generate random subnets.
List<Integer> subnet = new ArrayList<>();
- int i = 0;
- while (i <= CNT)
- subnet.add(++i);
+ for (int i = 1; i <= CNT; i++)
+ subnet.add(i);
Collections.shuffle(subnet);
- final List<KeyedMessage<String, String>> messages = new ArrayList<>();
- for (int event = 0; event < CNT; event++) {
- long runtime = new Date().getTime();
- String ip = KEY_PREFIX + subnet.get(event);
+ List<KeyedMessage<String, String>> messages = new ArrayList<>(CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String ip = KEY_PREFIX + subnet.get(evt);
+
String msg = runtime + VALUE_URL + ip;
+
messages.add(new KeyedMessage<>(topic, ip, msg));
- keyValueMap.put(ip, msg);
+
+ keyValMap.put(ip, msg);
}
- final Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+ Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+
producer.close();
- return keyValueMap;
+ return keyValMap;
}
/**
- * Consumes Kafka Stream via ignite.
+ * Consumes Kafka stream via Ignite.
*
* @param topic Topic name.
- * @param keyValueMap Expected key value map.
- * @throws TimeoutException TimeoutException.
- * @throws InterruptedException InterruptedException.
+ * @param keyValMap Expected key value map.
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
*/
- private void consumerStream(final String topic, final Map<String, String> keyValueMap)
+ private void consumerStream(String topic, Map<String, String> keyValMap)
throws TimeoutException, InterruptedException {
-
KafkaStreamer<String, String, String> kafkaStmr = null;
- final Ignite ignite = grid();
- try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
+ Ignite ignite = grid();
+ try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(10);
- // Configure socket streamer.
+ // Configure Kafka streamer.
kafkaStmr = new KafkaStreamer<>();
// Get the cache.
IgniteCache<String, String> cache = ignite.cache(null);
- // Set ignite instance.
+ // Set Ignite instance.
kafkaStmr.setIgnite(ignite);
// Set data streamer instance.
@@ -167,58 +168,55 @@ public class KafkaIgniteStreamerSelfTest
kafkaStmr.setThreads(4);
// Set the consumer configuration.
- kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(),
- "groupX"));
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
// Set the decoders.
- StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties());
- kafkaStmr.setKeyDecoder(stringDecoder);
- kafkaStmr.setValueDecoder(stringDecoder);
+ StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
+
+ kafkaStmr.setKeyDecoder(strDecoder);
+ kafkaStmr.setValueDecoder(strDecoder);
// Start kafka streamer.
kafkaStmr.start();
final CountDownLatch latch = new CountDownLatch(CNT);
+
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
- @Override
- public boolean apply(UUID uuid, CacheEvent evt) {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
+
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
- latch.await();
- for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
- final String key = entry.getKey();
- final String value = entry.getValue();
+ latch.await();
- final String cacheValue = cache.get(key);
- assertEquals(value, cacheValue);
- }
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
}
-
finally {
- // Shutdown kafka streamer.
- kafkaStmr.stop();
+ if (kafkaStmr != null)
+ kafkaStmr.stop();
}
}
/**
* Creates default consumer config.
*
- * @param zooKeeper Zookeeper address <server:port>.
- * @param groupId Group Id for kafka subscriber.
- * @return {@link ConsumerConfig} kafka consumer configuration.
+ * @param zooKeeper ZooKeeper address <server:port>.
+ * @param grpId Group Id for kafka subscriber.
+ * @return Kafka consumer configuration.
*/
- private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) {
+ private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) {
A.notNull(zooKeeper, "zookeeper");
- A.notNull(groupId, "groupId");
+ A.notNull(grpId, "groupId");
Properties props = new Properties();
+
props.put("zookeeper.connect", zooKeeper);
- props.put("group.id", groupId);
+ props.put("group.id", grpId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
@@ -226,5 +224,4 @@ public class KafkaIgniteStreamerSelfTest
return new ConsumerConfig(props);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
index b836b44..1ef4f77 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
@@ -18,29 +18,36 @@
package org.apache.ignite.stream.kafka;
import kafka.producer.*;
+import kafka.utils.*;
/**
- * Simple Partitioner for Kafka.
+ * Simple partitioner for Kafka.
*/
@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner
- implements Partitioner {
+public class SimplePartitioner implements Partitioner {
+ /**
+ * Constructs instance.
+ *
+ * @param props Properties.
+ */
+ public SimplePartitioner(VerifiableProperties props) {
+ // No-op.
+ }
/**
* Partitions the key based on the key value.
*
* @param key Key.
- * @param partitionSize Partition size.
+ * @param partSize Partition size.
* @return partition Partition.
*/
- public int partition(Object key, int partitionSize) {
- int partition = 0;
+ public int partition(Object key, int partSize) {
String keyStr = (String)key;
+
String[] keyValues = keyStr.split("\\.");
+
Integer intKey = Integer.parseInt(keyValues[3]);
- if (intKey > 0) {
- partition = intKey % partitionSize;
- }
- return partition;
+
+ return intKey > 0 ? intKey % partSize : 0;
}
}
[07/14] incubator-ignite git commit: Kafka streamer test suite added
Posted by av...@apache.org.
Kafka streamer test suite added
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f72b2914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f72b2914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f72b2914
Branch: refs/heads/ignite-929
Commit: f72b291498d413d870669e8dcd7cc2a5f3b81b51
Parents: b6803e7
Author: agura <ag...@gridgain.com>
Authored: Fri Jul 3 21:18:57 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 21:18:57 2015 +0300
----------------------------------------------------------------------
.../kafka/IgniteKafkaStreamerSelfTestSuite.java | 37 ++++++++++++++++++++
1 file changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f72b2914/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
new file mode 100644
index 0000000..a2dfd17
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import junit.framework.*;
+
+/**
+ * Apache Kafka streamer tests.
+ */
+public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
+
+ suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
+
+ return suite;
+ }
+}
[06/14] incubator-ignite git commit: Kafka module version changed
Posted by av...@apache.org.
Kafka module version changed
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b6803e7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b6803e7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b6803e7e
Branch: refs/heads/ignite-929
Commit: b6803e7e1d71d6f6b54b0e9cd769ec6ca91d0564
Parents: 63fce5a
Author: agura <ag...@gridgain.com>
Authored: Fri Jul 3 19:54:19 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 19:54:19 2015 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6803e7e/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index 43909bc..d0e9cd9 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -31,7 +31,7 @@
</parent>
<artifactId>ignite-kafka</artifactId>
- <version>1.1.6-SNAPSHOT</version>
+ <version>1.2.1-SNAPSHOT</version>
<dependencies>
<dependency>
[13/14] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-7' into ignite-sprint-7
Posted by av...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-7' into ignite-sprint-7
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fc9e4eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fc9e4eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fc9e4eb
Branch: refs/heads/ignite-929
Commit: 8fc9e4ebf391562686b4159fe52ab045fd47279a
Parents: 2acacd9 a577d27
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jul 6 12:31:02 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jul 6 12:31:02 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 6 +-
...acheAtomicReplicatedNodeRestartSelfTest.java | 10 +++
.../TcpDiscoveryNodeConsistentIdSelfTest.java | 80 ++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
4 files changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[11/14] incubator-ignite git commit: #ignite-1086:
ClusterNode.consistentId() should be the same after restart.
Posted by av...@apache.org.
#ignite-1086: ClusterNode.consistentId() should be the same after restart.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a577d27b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a577d27b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a577d27b
Branch: refs/heads/ignite-929
Commit: a577d27b6a857e80d4ddf48a635dce09123eec8a
Parents: 3a4d008
Author: ivasilinets <iv...@gridgain.com>
Authored: Mon Jul 6 11:19:00 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Mon Jul 6 11:19:00 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 6 +-
.../TcpDiscoveryNodeConsistentIdSelfTest.java | 80 ++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
3 files changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a577d27b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f457d6c..46a23d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8048,9 +8048,13 @@ public abstract class IgniteUtils {
public static String consistentId(Collection<String> addrs, int port) {
assert !F.isEmpty(addrs);
+ List<String> sortedAddrs = new ArrayList<>(addrs);
+
+ Collections.sort(sortedAddrs);
+
StringBuilder sb = new StringBuilder();
- for (String addr : addrs)
+ for (String addr : sortedAddrs)
sb.append(addr).append(',');
sb.delete(sb.length() - 1, sb.length());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a577d27b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConsistentIdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConsistentIdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConsistentIdSelfTest.java
new file mode 100644
index 0000000..d159d72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeConsistentIdSelfTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ * Test for {@link TcpDiscoveryNode#consistentId()}
+ */
+public class TcpDiscoveryNodeConsistentIdSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("0.0.0.0");
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConsistentId() throws Exception {
+ Object id0 = grid(0).localNode().consistentId();
+
+ int port0 = getDiscoveryPort(grid(0));
+
+ for (int i = 0; i < 10; ++i) {
+ stopAllGrids();
+
+ startGrids(1);
+
+ if (port0 == getDiscoveryPort(grid(0)))
+ assertEquals(id0, grid(0).localNode().consistentId());
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @return Discovery port.
+ */
+ private int getDiscoveryPort(Ignite ignite) {
+ return ((TcpDiscoveryNode) ignite.cluster().localNode()).discoveryPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a577d27b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ea5a7ac..498f50c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -55,6 +55,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
+ suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
+
return suite;
}
}
[10/14] incubator-ignite git commit: disable failing test
Posted by av...@apache.org.
disable failing test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a4d008b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a4d008b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a4d008b
Branch: refs/heads/ignite-929
Commit: 3a4d008b07ec9971e229e5bdb99bab38858634e5
Parents: b1342ba
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Jul 6 10:32:06 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Mon Jul 6 10:32:06 2015 +0300
----------------------------------------------------------------------
.../replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4d008b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
index c802d7c..54409d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
@@ -27,7 +27,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- //fail("https://issues.apache.org/jira/browse/IGNITE-747");
+ fail("https://issues.apache.org/jira/browse/IGNITE-747");
}
/** {@inheritDoc} */
[12/14] incubator-ignite git commit: GG-10501: Fixed.
Posted by av...@apache.org.
GG-10501: Fixed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2acacd98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2acacd98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2acacd98
Branch: refs/heads/ignite-929
Commit: 2acacd98c99210ac97ac2841f37268348e55c946
Parents: f72b291
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jul 6 12:30:36 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jul 6 12:30:36 2015 +0300
----------------------------------------------------------------------
.../internal/interop/InteropIgnition.java | 31 +++++++++++++-------
1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2acacd98/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
index fb0d6e1..2989a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
@@ -50,24 +50,33 @@ public class InteropIgnition {
*/
public static synchronized InteropProcessor start(@Nullable String springCfgPath, @Nullable String gridName,
int factoryId, long envPtr, long dataPtr) {
- IgniteConfiguration cfg = configuration(springCfgPath);
+ ClassLoader oldClsLdr = Thread.currentThread().getContextClassLoader();
- if (gridName != null)
- cfg.setGridName(gridName);
- else
- gridName = cfg.getGridName();
+ Thread.currentThread().setContextClassLoader(InteropIgnition.class.getClassLoader());
+
+ try {
+ IgniteConfiguration cfg = configuration(springCfgPath);
- InteropBootstrap bootstrap = bootstrap(factoryId);
+ if (gridName != null)
+ cfg.setGridName(gridName);
+ else
+ gridName = cfg.getGridName();
- InteropProcessor proc = bootstrap.start(cfg, envPtr, dataPtr);
+ InteropBootstrap bootstrap = bootstrap(factoryId);
- trackFinalization(proc);
+ InteropProcessor proc = bootstrap.start(cfg, envPtr, dataPtr);
- InteropProcessor old = instances.put(gridName, proc);
+ trackFinalization(proc);
- assert old == null;
+ InteropProcessor old = instances.put(gridName, proc);
- return proc;
+ assert old == null;
+
+ return proc;
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldClsLdr);
+ }
}
/**
[02/14] incubator-ignite git commit: # ignite-1055: review (add
option to help)
Posted by av...@apache.org.
# ignite-1055: review (add option to help)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/251f0c88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/251f0c88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/251f0c88
Branch: refs/heads/ignite-929
Commit: 251f0c88377b05821e9cfcb4a6ee6caa9026adb9
Parents: 7030ee1
Author: ashutak <as...@gridgain.com>
Authored: Fri Jul 3 17:51:24 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Fri Jul 3 17:51:24 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/startup/cmdline/CommandLineStartup.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/251f0c88/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
index 4ddbbd0..5b39069 100644
--- a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
+++ b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
@@ -158,7 +158,8 @@ public final class CommandLineStartup {
" Where:",
" ?, /help, -help, - show this message.",
" -v - verbose mode (quiet by default).",
- " -np - no pause on exit (pause by default)");
+ " -np - no pause on exit (pause by default)",
+ " -nojmx - disable JMX monitoring (enabled by default)");
if (ignite) {
X.error(
[03/14] incubator-ignite git commit: Merge branch 'ignite-1055' into
ignite-sprint-7
Posted by av...@apache.org.
Merge branch 'ignite-1055' into ignite-sprint-7
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9f6a7f9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9f6a7f9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9f6a7f9b
Branch: refs/heads/ignite-929
Commit: 9f6a7f9bed05bd15807c534eb14d801bedd37970
Parents: 7828d19 251f0c8
Author: ashutak <as...@gridgain.com>
Authored: Fri Jul 3 17:52:00 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Fri Jul 3 17:52:00 2015 +0300
----------------------------------------------------------------------
bin/ignite.bat | 8 ++++++--
bin/ignite.sh | 6 +++++-
bin/include/parseargs.bat | 1 +
bin/include/parseargs.sh | 3 +++
.../apache/ignite/startup/cmdline/CommandLineStartup.java | 3 ++-
.../ignite/startup/cmdline/CommandLineTransformer.java | 9 +++++++++
6 files changed, 26 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[08/14] incubator-ignite git commit: disable failing test
Posted by av...@apache.org.
disable failing test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/334fee9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/334fee9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/334fee9e
Branch: refs/heads/ignite-929
Commit: 334fee9e300342c0a9613fdcaba5aa97ef1a86a4
Parents: f72b291
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Jul 6 10:19:18 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Mon Jul 6 10:19:18 2015 +0300
----------------------------------------------------------------------
.../IgniteCacheAtomicReplicatedNodeRestartSelfTest.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/334fee9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
index 379ed65..52d8871 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
@@ -31,6 +31,11 @@ public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheRep
}
/** {@inheritDoc} */
+ @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1095");
+ }
+
+ /** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return ATOMIC;
}
[04/14] incubator-ignite git commit: ignite-428 Implement
IgniteKafkaStreamer to stream data from Apache Kafka
Posted by av...@apache.org.
ignite-428 Implement IgniteKafkaStreamer to stream data from Apache Kafka
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2c41739d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2c41739d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2c41739d
Branch: refs/heads/ignite-929
Commit: 2c41739dcd83751270b6bd30c6f2595edc68a1b1
Parents: 9f6a7f9
Author: vishal.garg <vi...@workday.com>
Authored: Mon Jun 22 19:35:08 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 19:39:11 2015 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 128 +++++++
.../ignite/stream/kafka/KafkaStreamer.java | 179 +++++++++
.../stream/kafka/KafkaEmbeddedBroker.java | 373 +++++++++++++++++++
.../kafka/KafkaIgniteStreamerSelfTest.java | 230 ++++++++++++
.../ignite/stream/kafka/SimplePartitioner.java | 46 +++
pom.xml | 1 +
6 files changed, 957 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
new file mode 100644
index 0000000..165ec1c
--- /dev/null
+++ b/modules/kafka/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-kafka</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ <version>4.2</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.gridgain</groupId>
+ <artifactId>ignite-shmem</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>1.8.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
new file mode 100644
index 0000000..e0240ce
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.stream.*;
+
+import kafka.consumer.*;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.*;
+import kafka.serializer.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link
+ * org.apache.ignite.IgniteDataStreamer} instance.
+ * <p>
+ * Uses Kafka's High Level Consumer API to read messages from Kafka
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
+ * Example</a>
+ */
+public class KafkaStreamer<T, K, V>
+ extends StreamAdapter<T, K, V> {
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Executor used to submit kafka streams. */
+ private ExecutorService executor;
+
+ /** Topic. */
+ private String topic;
+
+ /** Number of threads to process kafka streams. */
+ private int threads;
+
+ /** Kafka Consumer Config. */
+ private ConsumerConfig consumerConfig;
+
+ /** Key Decoder. */
+ private Decoder<K> keyDecoder;
+
+ /** Value Decoder. */
+ private Decoder<V> valueDecoder;
+
+ /** Kafka Consumer connector. */
+ private ConsumerConnector consumer;
+
+ /**
+ * Sets the topic.
+ *
+ * @param topic Topic Name.
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Sets the threads.
+ *
+ * @param threads Number of Threads.
+ */
+ public void setThreads(final int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets the consumer config.
+ *
+ * @param consumerConfig Consumer configuration.
+ */
+ public void setConsumerConfig(final ConsumerConfig consumerConfig) {
+ this.consumerConfig = consumerConfig;
+ }
+
+ /**
+ * Sets the key decoder.
+ *
+ * @param keyDecoder Key Decoder.
+ */
+ public void setKeyDecoder(final Decoder<K> keyDecoder) {
+ this.keyDecoder = keyDecoder;
+ }
+
+ /**
+ * Sets the value decoder.
+ *
+ * @param valueDecoder Value Decoder
+ */
+ public void setValueDecoder(final Decoder<V> valueDecoder) {
+ this.valueDecoder = valueDecoder;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.notNull(topic, "topic");
+ A.notNull(keyDecoder, "key decoder");
+ A.notNull(valueDecoder, "value decoder");
+ A.notNull(consumerConfig, "kafka consumer config");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
+
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(topic, threads);
+
+ Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder,
+ valueDecoder);
+
+ List<KafkaStream<K, V>> streams = consumerMap.get(topic);
+
+ // Now launch all the consumer threads.
+ executor = Executors.newFixedThreadPool(threads);
+
+ // Now create an object to consume the messages.
+ for (final KafkaStream<K,V> stream : streams) {
+ executor.submit(new Runnable() {
+ @Override public void run() {
+
+ ConsumerIterator<K, V> it = stream.iterator();
+
+ while (it.hasNext()) {
+ final MessageAndMetadata<K, V> messageAndMetadata = it.next();
+ getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ if (consumer != null)
+ consumer.shutdown();
+
+ if (executor != null) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+ if (log.isDebugEnabled())
+ log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ }
+ catch (InterruptedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Interrupted during shutdown, exiting uncleanly");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
new file mode 100644
index 0000000..28533f7
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.commons.io.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.zookeeper.server.*;
+
+import kafka.admin.*;
+import kafka.api.*;
+import kafka.api.Request;
+import kafka.producer.*;
+import kafka.server.*;
+import kafka.utils.*;
+import org.I0Itec.zkclient.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Kafka Embedded Broker.
+ */
+public class KafkaEmbeddedBroker {
+
+ /** Default ZooKeeper Host. */
+ private static final String ZK_HOST = "localhost";
+
+ /** Broker Port. */
+ private static final int BROKER_PORT = 9092;
+
+ /** ZooKeeper Connection Timeout. */
+ private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+ /** ZooKeeper Session Timeout. */
+ private static final int ZK_SESSION_TIMEOUT = 6000;
+
+ /** ZooKeeper port. */
+ private static int zkPort = 0;
+
+ /** Is ZooKeeper Ready. */
+ private boolean zkReady;
+
+ /** Kafka Config. */
+ private KafkaConfig brokerConfig;
+
+ /** Kafka Server. */
+ private KafkaServer kafkaServer;
+
+ /** ZooKeeper Client. */
+ private ZkClient zkClient;
+
+ /** Embedded ZooKeeper. */
+ private EmbeddedZooKeeper zooKeeper;
+
+ /**
+ * Creates an embedded Kafka Broker.
+ */
+ public KafkaEmbeddedBroker() {
+ try {
+ setupEmbeddedZooKeeper();
+ setupEmbeddedKafkaServer();
+ }
+ catch (IOException | InterruptedException e) {
+ throw new RuntimeException("failed to start Kafka Broker " + e);
+ }
+
+ }
+
+ /**
+ * @return ZooKeeper Address.
+ */
+ public static String getZKAddress() {
+ return ZK_HOST + ":" + zkPort;
+ }
+
+ /**
+ * Creates a Topic.
+ *
+ * @param topic topic name
+ * @param partitions number of paritions for the topic
+ * @param replicationFactor replication factor
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void createTopic(String topic, final int partitions, final int replicationFactor)
+ throws TimeoutException, InterruptedException {
+ AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+ waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
+ }
+
+ /**
+ * Sends message to Kafka Broker.
+ *
+ * @param keyedMessages List of Keyed Messages.
+ * @return Producer used to send the message.
+ */
+ public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ Producer<String, String> producer = new Producer<>(getProducerConfig());
+ producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+ return producer;
+ }
+
+ /**
+ * Shuts down Kafka Broker.
+ *
+ * @throws IOException
+ */
+ public void shutdown()
+ throws IOException {
+
+ zkReady = false;
+
+ if (kafkaServer != null)
+ kafkaServer.shutdown();
+
+ List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs());
+
+ for (String logDir : logDirs) {
+ FileUtils.deleteDirectory(new File(logDir));
+ }
+
+ if (zkClient != null) {
+ zkClient.close();
+ zkClient = null;
+ }
+
+ if (zooKeeper != null) {
+
+ try {
+ zooKeeper.shutdown();
+ }
+ catch (IOException e) {
+ // ignore
+ }
+
+ zooKeeper = null;
+ }
+
+ }
+
+ /**
+ * @return the Zookeeper Client
+ */
+ private ZkClient getZkClient() {
+ A.ensure(zkReady, "Zookeeper not setup yet");
+ A.notNull(zkClient, "Zookeeper client is not yet initialized");
+
+ return zkClient;
+ }
+
+ /**
+ * Checks if topic metadata is propagated.
+ *
+ * @param topic topic name
+ * @param partition partition
+ * @return true if propagated otherwise false
+ */
+ private boolean isMetadataPropagated(final String topic, final int partition) {
+ final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo(
+ topic, partition);
+ if (partitionStateOption.isDefined()) {
+ final PartitionStateInfo partitionState = partitionStateOption.get();
+ final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr();
+
+ if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null
+ && Request.isValidBrokerId(leaderAndInSyncReplicas.leader())
+ && leaderAndInSyncReplicas.isr().size() >= 1)
+ return true;
+
+ }
+ return false;
+ }
+
+ /**
+ * Waits until metadata is propagated.
+ *
+ * @param topic topic name
+ * @param partition partition
+ * @param timeout timeout value in millis
+ * @param interval interval in millis to sleep
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout,
+ final long interval) throws TimeoutException, InterruptedException {
+ int attempt = 1;
+ final long startTime = System.currentTimeMillis();
+
+ while (true) {
+ if (isMetadataPropagated(topic, partition))
+ return;
+
+ final long duration = System.currentTimeMillis() - startTime;
+
+ if (duration < timeout)
+ Thread.sleep(interval);
+ else
+ throw new TimeoutException("metadata propagate timed out, attempt=" + attempt);
+
+ attempt++;
+ }
+
+ }
+
+ /**
+ * Sets up embedded Kafka Server
+ *
+ * @throws IOException
+ */
+ private void setupEmbeddedKafkaServer()
+ throws IOException {
+ A.ensure(zkReady, "Zookeeper should be setup before hand");
+
+ brokerConfig = new KafkaConfig(getBrokerConfig());
+ kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$);
+ kafkaServer.startup();
+ }
+
+ /**
+ * Sets up embedded zooKeeper
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void setupEmbeddedZooKeeper()
+ throws IOException, InterruptedException {
+ EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
+ zooKeeper.startup();
+ zkPort = zooKeeper.getActualPort();
+ zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
+ zkReady = true;
+ }
+
+ /**
+ * @return Kafka Broker Address.
+ */
+ private static String getBrokerAddress() {
+ return ZK_HOST + ":" + BROKER_PORT;
+ }
+
+ /**
+ * Gets KafKa Brofer Config
+ *
+ * @return Kafka Broker Config
+ * @throws IOException
+ */
+ private static Properties getBrokerConfig()
+ throws IOException {
+ Properties props = new Properties();
+ props.put("broker.id", "0");
+ props.put("host.name", ZK_HOST);
+ props.put("port", "" + BROKER_PORT);
+ props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
+ props.put("zookeeper.connect", getZKAddress());
+ props.put("log.flush.interval.messages", "1");
+ props.put("replica.socket.timeout.ms", "1500");
+ return props;
+ }
+
+ /**
+ * @return Kafka Producer Config
+ */
+ private static ProducerConfig getProducerConfig() {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", getBrokerAddress());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("key.serializer.class", "kafka.serializer.StringEncoder");
+ props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner");
+ return new ProducerConfig(props);
+ }
+
+ /**
+ * Creates Temp Directory
+ *
+ * @param prefix prefix
+ * @return Created File.
+ * @throws IOException
+ */
+ private static File createTempDir(final String prefix)
+ throws IOException {
+ final Path path = Files.createTempDirectory(prefix);
+ return path.toFile();
+
+ }
+
+ /**
+ * Creates Embedded ZooKeeper.
+ */
+ private static class EmbeddedZooKeeper {
+ /** Default ZooKeeper Host. */
+ private final String zkHost;
+
+ /** Default ZooKeeper Port. */
+ private final int zkPort;
+
+ /** NIO Context Factory. */
+ private NIOServerCnxnFactory factory;
+
+ /** Snapshot Directory. */
+ private File snapshotDir;
+
+ /** Log Directory. */
+ private File logDir;
+
+ /**
+ * Creates an embedded Zookeeper
+ * @param zkHost zookeeper host
+ * @param zkPort zookeeper port
+ */
+ EmbeddedZooKeeper(final String zkHost, final int zkPort) {
+ this.zkHost = zkHost;
+ this.zkPort = zkPort;
+ }
+
+ /**
+ * Starts up ZooKeeper.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void startup()
+ throws IOException, InterruptedException {
+ snapshotDir = createTempDir("_ss");
+ logDir = createTempDir("_log");
+ ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500);
+ factory = new NIOServerCnxnFactory();
+ factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
+ factory.startup(zooServer);
+ }
+
+ /**
+ *
+ * @return actual port zookeeper is started
+ */
+ int getActualPort() {
+ return factory.getLocalPort();
+ }
+
+ /**
+ * Shuts down ZooKeeper.
+ *
+ * @throws IOException
+ */
+ void shutdown()
+ throws IOException {
+ if (factory != null) {
+ factory.shutdown();
+
+ U.delete(snapshotDir);
+ U.delete(logDir);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
new file mode 100644
index 0000000..5972639
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import kafka.consumer.*;
+import kafka.producer.*;
+import kafka.serializer.*;
+import kafka.utils.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link KafkaStreamer}.
+ */
+public class KafkaIgniteStreamerSelfTest
+ extends GridCommonAbstractTest {
+ /** Embedded Kafka. */
+ private KafkaEmbeddedBroker embeddedBroker;
+
+ /** Count. */
+ private static final int CNT = 100;
+
+ /** Test Topic. */
+ private static final String TOPIC_NAME = "page_visits";
+
+ /** Kafka Partition. */
+ private static final int PARTITIONS = 4;
+
+ /** Kafka Replication Factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Topic Message Key Prefix. */
+ private static final String KEY_PREFIX = "192.168.2.";
+
+ /** Topic Message Value Url. */
+ private static final String VALUE_URL = ",www.example.com,";
+
+ /** Constructor. */
+ public KafkaIgniteStreamerSelfTest() {
+ super(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void beforeTest()
+ throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ embeddedBroker = new KafkaEmbeddedBroker();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void afterTest()
+ throws Exception {
+ grid().cache(null).clear();
+
+ embeddedBroker.shutdown();
+ }
+
+ /**
+ * Tests Kafka streamer.
+ *
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void testKafkaStreamer()
+ throws TimeoutException, InterruptedException {
+ embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
+
+ Map<String, String> keyValueMap = produceStream(TOPIC_NAME);
+ consumerStream(TOPIC_NAME, keyValueMap);
+ }
+
+ /**
+ * Produces/Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(final String topic) {
+ final Map<String, String> keyValueMap = new HashMap<>();
+
+ // Generate random subnets.
+ List<Integer> subnet = new ArrayList<>();
+
+ int i = 0;
+ while (i <= CNT)
+ subnet.add(++i);
+
+ Collections.shuffle(subnet);
+
+ final List<KeyedMessage<String, String>> messages = new ArrayList<>();
+ for (int event = 0; event < CNT; event++) {
+ long runtime = new Date().getTime();
+ String ip = KEY_PREFIX + subnet.get(event);
+ String msg = runtime + VALUE_URL + ip;
+ messages.add(new KeyedMessage<>(topic, ip, msg));
+ keyValueMap.put(ip, msg);
+ }
+
+ final Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+ producer.close();
+
+ return keyValueMap;
+ }
+
+ /**
+ * Consumes Kafka Stream via ignite.
+ *
+ * @param topic Topic name.
+ * @param keyValueMap Expected key value map.
+ * @throws TimeoutException TimeoutException.
+ * @throws InterruptedException InterruptedException.
+ */
+ private void consumerStream(final String topic, final Map<String, String> keyValueMap)
+ throws TimeoutException, InterruptedException {
+
+ KafkaStreamer<String, String, String> kafkaStmr = null;
+
+ final Ignite ignite = grid();
+ try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
+
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ // Configure socket streamer.
+ kafkaStmr = new KafkaStreamer<>();
+
+ // Get the cache.
+ IgniteCache<String, String> cache = ignite.cache(null);
+
+ // Set ignite instance.
+ kafkaStmr.setIgnite(ignite);
+
+ // Set data streamer instance.
+ kafkaStmr.setStreamer(stmr);
+
+ // Set the topic.
+ kafkaStmr.setTopic(topic);
+
+ // Set the number of threads.
+ kafkaStmr.setThreads(4);
+
+ // Set the consumer configuration.
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(),
+ "groupX"));
+
+ // Set the decoders.
+ StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties());
+ kafkaStmr.setKeyDecoder(stringDecoder);
+ kafkaStmr.setValueDecoder(stringDecoder);
+
+ // Start kafka streamer.
+ kafkaStmr.start();
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override
+ public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+ latch.await();
+
+ for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+
+ final String cacheValue = cache.get(key);
+ assertEquals(value, cacheValue);
+ }
+ }
+
+ finally {
+ // Shutdown kafka streamer.
+ kafkaStmr.stop();
+ }
+ }
+
+ /**
+ * Creates default consumer config.
+ *
+ * @param zooKeeper Zookeeper address <server:port>.
+ * @param groupId Group Id for kafka subscriber.
+ * @return {@link ConsumerConfig} kafka consumer configuration.
+ */
+ private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) {
+ A.notNull(zooKeeper, "zookeeper");
+ A.notNull(groupId, "groupId");
+
+ Properties props = new Properties();
+ props.put("zookeeper.connect", zooKeeper);
+ props.put("group.id", groupId);
+ props.put("zookeeper.session.timeout.ms", "400");
+ props.put("zookeeper.sync.time.ms", "200");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.offset.reset", "smallest");
+
+ return new ConsumerConfig(props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
new file mode 100644
index 0000000..b836b44
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import kafka.producer.*;
+
+/**
+ * Simple Partitioner for Kafka.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class SimplePartitioner
+ implements Partitioner {
+
+ /**
+ * Partitions the key based on the key value.
+ *
+ * @param key Key.
+ * @param partitionSize Partition size.
+ * @return partition Partition.
+ */
+ public int partition(Object key, int partitionSize) {
+ int partition = 0;
+ String keyStr = (String)key;
+ String[] keyValues = keyStr.split("\\.");
+ Integer intKey = Integer.parseInt(keyValues[3]);
+ if (intKey > 0) {
+ partition = intKey % partitionSize;
+ }
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6d1609..b47d34b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<module>modules/gce</module>
<module>modules/cloud</module>
<module>modules/mesos</module>
+ <module>modules/kafka</module>
</modules>
<profiles>