You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/08 15:18:19 UTC
[05/50] incubator-ignite git commit: ignite-428 Review fixes
ignite-428 Review fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/63fce5a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/63fce5a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/63fce5a4
Branch: refs/heads/ignite-648-failover
Commit: 63fce5a4920531ad8cd9afc47d829bb2fa4bc438
Parents: 2c41739
Author: agura <ag...@gridgain.com>
Authored: Thu Jun 25 22:18:00 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 3 19:39:12 2015 +0300
----------------------------------------------------------------------
modules/kafka/pom.xml | 18 +-
.../ignite/stream/kafka/KafkaStreamer.java | 121 ++++++---
.../stream/kafka/KafkaEmbeddedBroker.java | 251 ++++++++++---------
.../kafka/KafkaIgniteStreamerSelfTest.java | 131 +++++-----
.../ignite/stream/kafka/SimplePartitioner.java | 27 +-
5 files changed, 293 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index 165ec1c..43909bc 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -31,7 +31,7 @@
</parent>
<artifactId>ignite-kafka</artifactId>
- <version>1.1.1-SNAPSHOT</version>
+ <version>1.1.6-SNAPSHOT</version>
<dependencies>
<dependency>
@@ -39,6 +39,7 @@
<artifactId>ignite-core</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
@@ -66,6 +67,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -73,12 +75,6 @@
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.4</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-log4j</artifactId>
<version>${project.version}</version>
@@ -90,7 +86,6 @@
<version>4.2</version>
</dependency>
-
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -103,13 +98,6 @@
</dependency>
<dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.8.3</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index e0240ce..e9ad0bd 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -30,16 +30,17 @@ import java.util.*;
import java.util.concurrent.*;
/**
- * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link
- * org.apache.ignite.IgniteDataStreamer} instance.
+ * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into
+ * {@link IgniteDataStreamer} instance.
* <p>
- * Uses Kafka's High Level Consumer API to read messages from Kafka
+ * Uses Kafka's High Level Consumer API to read messages from Kafka.
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group
* Example</a>
*/
-public class KafkaStreamer<T, K, V>
- extends StreamAdapter<T, K, V> {
+public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Retry timeout. */
+ private static final long DFLT_RETRY_TIMEOUT = 10000;
/** Logger. */
private IgniteLogger log;
@@ -53,61 +54,78 @@ public class KafkaStreamer<T, K, V>
/** Number of threads to process kafka streams. */
private int threads;
- /** Kafka Consumer Config. */
- private ConsumerConfig consumerConfig;
+ /** Kafka consumer config. */
+ private ConsumerConfig consumerCfg;
- /** Key Decoder. */
+ /** Key decoder. */
private Decoder<K> keyDecoder;
- /** Value Decoder. */
- private Decoder<V> valueDecoder;
+ /** Value decoder. */
+ private Decoder<V> valDecoder;
- /** Kafka Consumer connector. */
+ /** Kafka consumer connector. */
private ConsumerConnector consumer;
+ /** Retry timeout. */
+ private long retryTimeout = DFLT_RETRY_TIMEOUT;
+
+ /** Stopped. */
+ private volatile boolean stopped;
+
/**
- * Sets the topic.
+ * Sets the topic name.
*
- * @param topic Topic Name.
+ * @param topic Topic name.
*/
- public void setTopic(final String topic) {
+ public void setTopic(String topic) {
this.topic = topic;
}
/**
* Sets the threads.
*
- * @param threads Number of Threads.
+ * @param threads Number of threads.
*/
- public void setThreads(final int threads) {
+ public void setThreads(int threads) {
this.threads = threads;
}
/**
* Sets the consumer config.
*
- * @param consumerConfig Consumer configuration.
+ * @param consumerCfg Consumer configuration.
*/
- public void setConsumerConfig(final ConsumerConfig consumerConfig) {
- this.consumerConfig = consumerConfig;
+ public void setConsumerConfig(ConsumerConfig consumerCfg) {
+ this.consumerCfg = consumerCfg;
}
/**
* Sets the key decoder.
*
- * @param keyDecoder Key Decoder.
+ * @param keyDecoder Key decoder.
*/
- public void setKeyDecoder(final Decoder<K> keyDecoder) {
+ public void setKeyDecoder(Decoder<K> keyDecoder) {
this.keyDecoder = keyDecoder;
}
/**
* Sets the value decoder.
*
- * @param valueDecoder Value Decoder
+ * @param valDecoder Value decoder.
*/
- public void setValueDecoder(final Decoder<V> valueDecoder) {
- this.valueDecoder = valueDecoder;
+ public void setValueDecoder(Decoder<V> valDecoder) {
+ this.valDecoder = valDecoder;
+ }
+
+ /**
+ * Sets the retry timeout.
+ *
+ * @param retryTimeout Retry timeout.
+ */
+ public void setRetryTimeout(long retryTimeout) {
+ A.ensure(retryTimeout > 0, "retryTimeout > 0");
+
+ this.retryTimeout = retryTimeout;
}
/**
@@ -120,35 +138,56 @@ public class KafkaStreamer<T, K, V>
A.notNull(getIgnite(), "ignite");
A.notNull(topic, "topic");
A.notNull(keyDecoder, "key decoder");
- A.notNull(valueDecoder, "value decoder");
- A.notNull(consumerConfig, "kafka consumer config");
+ A.notNull(valDecoder, "value decoder");
+ A.notNull(consumerCfg, "kafka consumer config");
A.ensure(threads > 0, "threads > 0");
log = getIgnite().log();
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg);
+
+ Map<String, Integer> topicCntMap = new HashMap<>();
- Map<String, Integer> topicCountMap = new HashMap<>();
- topicCountMap.put(topic, threads);
+ topicCntMap.put(topic, threads);
- Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder,
- valueDecoder);
+ Map<String, List<KafkaStream<K, V>>> consumerMap =
+ consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder);
List<KafkaStream<K, V>> streams = consumerMap.get(topic);
// Now launch all the consumer threads.
executor = Executors.newFixedThreadPool(threads);
+ stopped = false;
+
// Now create an object to consume the messages.
- for (final KafkaStream<K,V> stream : streams) {
+ for (final KafkaStream<K, V> stream : streams) {
executor.submit(new Runnable() {
@Override public void run() {
-
- ConsumerIterator<K, V> it = stream.iterator();
-
- while (it.hasNext()) {
- final MessageAndMetadata<K, V> messageAndMetadata = it.next();
- getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+ while (!stopped) {
+ try {
+ for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
+ MessageAndMetadata<K, V> msg = it.next();
+
+ try {
+ getStreamer().addData(msg.key(), msg.message());
+ }
+ catch (Exception e) {
+ U.warn(log, "Message is ignored due to an error [msg=" + msg + ']', e);
+ }
+ }
+ }
+ catch (Exception e) {
+ U.warn(log, "Message can't be consumed from stream. Retry after " +
+ retryTimeout + " ms.", e);
+
+ try {
+ Thread.sleep(retryTimeout);
+ }
+ catch (InterruptedException ie) {
+ // No-op.
+ }
+ }
}
}
});
@@ -159,6 +198,8 @@ public class KafkaStreamer<T, K, V>
* Stops streamer.
*/
public void stop() {
+ stopped = true;
+
if (consumer != null)
consumer.shutdown();
@@ -168,11 +209,11 @@ public class KafkaStreamer<T, K, V>
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
if (log.isDebugEnabled())
- log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
}
catch (InterruptedException e) {
if (log.isDebugEnabled())
- log.debug("Interrupted during shutdown, exiting uncleanly");
+ log.debug("Interrupted during shutdown, exiting uncleanly.");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
index 28533f7..98b9e4c 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
@@ -17,7 +17,6 @@
package org.apache.ignite.stream.kafka;
-import org.apache.commons.io.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.zookeeper.server.*;
@@ -25,6 +24,7 @@ import kafka.admin.*;
import kafka.api.*;
import kafka.api.Request;
import kafka.producer.*;
+import kafka.serializer.*;
import kafka.server.*;
import kafka.utils.*;
import org.I0Itec.zkclient.*;
@@ -39,106 +39,104 @@ import java.util.concurrent.*;
* Kafka Embedded Broker.
*/
public class KafkaEmbeddedBroker {
-
- /** Default ZooKeeper Host. */
+ /** Default ZooKeeper host. */
private static final String ZK_HOST = "localhost";
- /** Broker Port. */
+ /** Broker port. */
private static final int BROKER_PORT = 9092;
- /** ZooKeeper Connection Timeout. */
+ /** ZooKeeper connection timeout. */
private static final int ZK_CONNECTION_TIMEOUT = 6000;
- /** ZooKeeper Session Timeout. */
+ /** ZooKeeper session timeout. */
private static final int ZK_SESSION_TIMEOUT = 6000;
/** ZooKeeper port. */
private static int zkPort = 0;
- /** Is ZooKeeper Ready. */
+ /** Is ZooKeeper ready. */
private boolean zkReady;
- /** Kafka Config. */
- private KafkaConfig brokerConfig;
+ /** Kafka config. */
+ private KafkaConfig brokerCfg;
- /** Kafka Server. */
- private KafkaServer kafkaServer;
+ /** Kafka server. */
+ private KafkaServer kafkaSrv;
- /** ZooKeeper Client. */
+ /** ZooKeeper client. */
private ZkClient zkClient;
/** Embedded ZooKeeper. */
private EmbeddedZooKeeper zooKeeper;
/**
- * Creates an embedded Kafka Broker.
+ * Creates an embedded Kafka broker.
*/
public KafkaEmbeddedBroker() {
try {
setupEmbeddedZooKeeper();
+
setupEmbeddedKafkaServer();
}
catch (IOException | InterruptedException e) {
- throw new RuntimeException("failed to start Kafka Broker " + e);
+ throw new RuntimeException("Failed to start Kafka broker " + e);
}
-
}
/**
- * @return ZooKeeper Address.
+ * @return ZooKeeper address.
*/
public static String getZKAddress() {
- return ZK_HOST + ":" + zkPort;
+ return ZK_HOST + ':' + zkPort;
}
/**
* Creates a Topic.
*
- * @param topic topic name
- * @param partitions number of paritions for the topic
- * @param replicationFactor replication factor
- * @throws TimeoutException
- * @throws InterruptedException
+ * @param topic Topic name.
+ * @param partitions Number of partitions for the topic.
+ * @param replicationFactor Replication factor.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
*/
- public void createTopic(String topic, final int partitions, final int replicationFactor)
+ public void createTopic(String topic, int partitions, int replicationFactor)
throws TimeoutException, InterruptedException {
AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+
waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
}
/**
- * Sends message to Kafka Broker.
+ * Sends message to Kafka broker.
*
- * @param keyedMessages List of Keyed Messages.
+ * @param keyedMessages List of keyed messages.
* @return Producer used to send the message.
*/
- public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
Producer<String, String> producer = new Producer<>(getProducerConfig());
+
producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
return producer;
}
/**
- * Shuts down Kafka Broker.
- *
- * @throws IOException
+ * Shuts down Kafka broker.
*/
- public void shutdown()
- throws IOException {
-
+ public void shutdown() {
zkReady = false;
- if (kafkaServer != null)
- kafkaServer.shutdown();
+ if (kafkaSrv != null)
+ kafkaSrv.shutdown();
- List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs());
+ List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
- for (String logDir : logDirs) {
- FileUtils.deleteDirectory(new File(logDir));
- }
+ for (String logDir : logDirs)
+ U.delete(new File(logDir));
if (zkClient != null) {
zkClient.close();
+
zkClient = null;
}
@@ -148,16 +146,15 @@ public class KafkaEmbeddedBroker {
zooKeeper.shutdown();
}
catch (IOException e) {
- // ignore
+ // No-op.
}
zooKeeper = null;
}
-
}
/**
- * @return the Zookeeper Client
+ * @return ZooKeeper client.
*/
private ZkClient getZkClient() {
A.ensure(zkReady, "Zookeeper not setup yet");
@@ -169,102 +166,105 @@ public class KafkaEmbeddedBroker {
/**
* Checks if topic metadata is propagated.
*
- * @param topic topic name
- * @param partition partition
- * @return true if propagated otherwise false
+ * @param topic Topic name.
+ * @param part Partition.
+ * @return {@code True} if propagated, otherwise {@code false}.
*/
- private boolean isMetadataPropagated(final String topic, final int partition) {
- final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo(
- topic, partition);
- if (partitionStateOption.isDefined()) {
- final PartitionStateInfo partitionState = partitionStateOption.get();
- final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
- if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null
- && Request.isValidBrokerId(leaderAndInSyncReplicas.leader())
- && leaderAndInSyncReplicas.isr().size() >= 1)
- return true;
+ private boolean isMetadataPropagated(String topic, int part) {
+ scala.Option<PartitionStateInfo> partStateOption =
+ kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
- }
- return false;
+ if (!partStateOption.isDefined())
+ return false;
+
+ PartitionStateInfo partState = partStateOption.get();
+
+ LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
+
+ return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
+ Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
}
/**
* Waits until metadata is propagated.
*
- * @param topic topic name
- * @param partition partition
- * @param timeout timeout value in millis
- * @param interval interval in millis to sleep
- * @throws TimeoutException
- * @throws InterruptedException
+ * @param topic Topic name.
+ * @param part Partition.
+ * @param timeout Timeout value in millis.
+ * @param interval Interval in millis to sleep.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
*/
- private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout,
- final long interval) throws TimeoutException, InterruptedException {
+ private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
+ throws TimeoutException, InterruptedException {
int attempt = 1;
- final long startTime = System.currentTimeMillis();
+
+ long startTime = System.currentTimeMillis();
while (true) {
- if (isMetadataPropagated(topic, partition))
+ if (isMetadataPropagated(topic, part))
return;
- final long duration = System.currentTimeMillis() - startTime;
+ long duration = System.currentTimeMillis() - startTime;
if (duration < timeout)
Thread.sleep(interval);
else
- throw new TimeoutException("metadata propagate timed out, attempt=" + attempt);
+ throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
attempt++;
}
-
}
/**
- * Sets up embedded Kafka Server
+ * Sets up embedded Kafka server.
*
- * @throws IOException
+ * @throws IOException If failed.
*/
- private void setupEmbeddedKafkaServer()
- throws IOException {
+ private void setupEmbeddedKafkaServer() throws IOException {
A.ensure(zkReady, "Zookeeper should be setup before hand");
- brokerConfig = new KafkaConfig(getBrokerConfig());
- kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$);
- kafkaServer.startup();
+ brokerCfg = new KafkaConfig(getBrokerConfig());
+
+ kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
+
+ kafkaSrv.startup();
}
/**
- * Sets up embedded zooKeeper
+ * Sets up embedded ZooKeeper.
*
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException If failed.
+ * @throws InterruptedException If interrupted.
*/
- private void setupEmbeddedZooKeeper()
- throws IOException, InterruptedException {
+ private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
+
zooKeeper.startup();
+
zkPort = zooKeeper.getActualPort();
+
zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
+
zkReady = true;
}
/**
- * @return Kafka Broker Address.
+ * @return Kafka broker address.
*/
private static String getBrokerAddress() {
- return ZK_HOST + ":" + BROKER_PORT;
+ return ZK_HOST + ':' + BROKER_PORT;
}
/**
- * Gets KafKa Brofer Config
+ * Gets Kafka broker config.
*
- * @return Kafka Broker Config
- * @throws IOException
+ * @return Kafka broker config.
+ * @throws IOException If failed.
*/
- private static Properties getBrokerConfig()
- throws IOException {
+ private static Properties getBrokerConfig() throws IOException {
Properties props = new Properties();
+
props.put("broker.id", "0");
props.put("host.name", ZK_HOST);
props.put("port", "" + BROKER_PORT);
@@ -272,60 +272,63 @@ public class KafkaEmbeddedBroker {
props.put("zookeeper.connect", getZKAddress());
props.put("log.flush.interval.messages", "1");
props.put("replica.socket.timeout.ms", "1500");
+
return props;
}
/**
- * @return Kafka Producer Config
+ * @return Kafka Producer config.
*/
private static ProducerConfig getProducerConfig() {
Properties props = new Properties();
+
props.put("metadata.broker.list", getBrokerAddress());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("key.serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner");
+ props.put("serializer.class", StringEncoder.class.getName());
+ props.put("key.serializer.class", StringEncoder.class.getName());
+ props.put("partitioner.class", SimplePartitioner.class.getName());
+
return new ProducerConfig(props);
}
/**
- * Creates Temp Directory
+ * Creates temp directory.
*
- * @param prefix prefix
- * @return Created File.
- * @throws IOException
+ * @param prefix Prefix.
+ * @return Created file.
+ * @throws IOException If failed.
*/
- private static File createTempDir(final String prefix)
- throws IOException {
- final Path path = Files.createTempDirectory(prefix);
- return path.toFile();
+ private static File createTempDir( String prefix) throws IOException {
+ Path path = Files.createTempDirectory(prefix);
+ return path.toFile();
}
/**
- * Creates Embedded ZooKeeper.
+ * Creates embedded ZooKeeper.
*/
private static class EmbeddedZooKeeper {
- /** Default ZooKeeper Host. */
+ /** Default ZooKeeper host. */
private final String zkHost;
- /** Default ZooKeeper Port. */
+ /** Default ZooKeeper port. */
private final int zkPort;
- /** NIO Context Factory. */
+ /** NIO context factory. */
private NIOServerCnxnFactory factory;
- /** Snapshot Directory. */
+ /** Snapshot directory. */
private File snapshotDir;
- /** Log Directory. */
+ /** Log directory. */
private File logDir;
/**
- * Creates an embedded Zookeeper
- * @param zkHost zookeeper host
- * @param zkPort zookeeper port
+ * Creates an embedded ZooKeeper.
+ *
+ * @param zkHost ZooKeeper host.
+ * @param zkPort ZooKeeper port.
*/
- EmbeddedZooKeeper(final String zkHost, final int zkPort) {
+ EmbeddedZooKeeper(String zkHost, int zkPort) {
this.zkHost = zkHost;
this.zkPort = zkPort;
}
@@ -333,22 +336,25 @@ public class KafkaEmbeddedBroker {
/**
* Starts up ZooKeeper.
*
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException If failed.
+ * @throws InterruptedException If interrupted.
*/
- void startup()
- throws IOException, InterruptedException {
+ void startup() throws IOException, InterruptedException {
snapshotDir = createTempDir("_ss");
+
logDir = createTempDir("_log");
- ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500);
+
+ ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
+
factory = new NIOServerCnxnFactory();
+
factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
- factory.startup(zooServer);
+
+ factory.startup(zkSrv);
}
/**
- *
- * @return actual port zookeeper is started
+ * @return Actual port ZooKeeper is started.
*/
int getActualPort() {
return factory.getLocalPort();
@@ -357,17 +363,16 @@ public class KafkaEmbeddedBroker {
/**
* Shuts down ZooKeeper.
*
- * @throws IOException
+ * @throws IOException If failed.
*/
- void shutdown()
- throws IOException {
+ void shutdown() throws IOException {
if (factory != null) {
factory.shutdown();
U.delete(snapshotDir);
+
U.delete(logDir);
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 5972639..2473990 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -32,31 +32,31 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.*;
/**
* Tests {@link KafkaStreamer}.
*/
-public class KafkaIgniteStreamerSelfTest
- extends GridCommonAbstractTest {
+public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
/** Embedded Kafka. */
private KafkaEmbeddedBroker embeddedBroker;
/** Count. */
private static final int CNT = 100;
- /** Test Topic. */
+ /** Test topic. */
private static final String TOPIC_NAME = "page_visits";
- /** Kafka Partition. */
+ /** Kafka partition. */
private static final int PARTITIONS = 4;
- /** Kafka Replication Factor. */
+ /** Kafka replication factor. */
private static final int REPLICATION_FACTOR = 1;
- /** Topic Message Key Prefix. */
+ /** Topic message key prefix. */
private static final String KEY_PREFIX = "192.168.2.";
- /** Topic Message Value Url. */
+ /** Topic message value URL. */
private static final String VALUE_URL = ",www.example.com,";
/** Constructor. */
@@ -65,18 +65,15 @@ public class KafkaIgniteStreamerSelfTest
}
/** {@inheritDoc} */
- @Override
- protected void beforeTest()
- throws Exception {
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
embeddedBroker = new KafkaEmbeddedBroker();
}
/** {@inheritDoc} */
- @Override
- protected void afterTest()
- throws Exception {
+ @Override protected void afterTest() throws Exception {
grid().cache(null).clear();
embeddedBroker.shutdown();
@@ -85,76 +82,80 @@ public class KafkaIgniteStreamerSelfTest
/**
* Tests Kafka streamer.
*
- * @throws TimeoutException
- * @throws InterruptedException
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
*/
- public void testKafkaStreamer()
- throws TimeoutException, InterruptedException {
+ public void testKafkaStreamer() throws TimeoutException, InterruptedException {
embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR);
- Map<String, String> keyValueMap = produceStream(TOPIC_NAME);
- consumerStream(TOPIC_NAME, keyValueMap);
+ Map<String, String> keyValMap = produceStream(TOPIC_NAME);
+
+ consumerStream(TOPIC_NAME, keyValMap);
}
/**
- * Produces/Sends messages to Kafka.
+ * Sends messages to Kafka.
*
* @param topic Topic name.
* @return Map of key value messages.
*/
- private Map<String, String> produceStream(final String topic) {
- final Map<String, String> keyValueMap = new HashMap<>();
-
+ private Map<String, String> produceStream(String topic) {
// Generate random subnets.
List<Integer> subnet = new ArrayList<>();
- int i = 0;
- while (i <= CNT)
- subnet.add(++i);
+ for (int i = 1; i <= CNT; i++)
+ subnet.add(i);
Collections.shuffle(subnet);
- final List<KeyedMessage<String, String>> messages = new ArrayList<>();
- for (int event = 0; event < CNT; event++) {
- long runtime = new Date().getTime();
- String ip = KEY_PREFIX + subnet.get(event);
+ List<KeyedMessage<String, String>> messages = new ArrayList<>(CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String ip = KEY_PREFIX + subnet.get(evt);
+
String msg = runtime + VALUE_URL + ip;
+
messages.add(new KeyedMessage<>(topic, ip, msg));
- keyValueMap.put(ip, msg);
+
+ keyValMap.put(ip, msg);
}
- final Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+ Producer<String, String> producer = embeddedBroker.sendMessages(messages);
+
producer.close();
- return keyValueMap;
+ return keyValMap;
}
/**
- * Consumes Kafka Stream via ignite.
+ * Consumes Kafka stream via Ignite.
*
* @param topic Topic name.
- * @param keyValueMap Expected key value map.
- * @throws TimeoutException TimeoutException.
- * @throws InterruptedException InterruptedException.
+ * @param keyValMap Expected key value map.
+ * @throws TimeoutException If timed out.
+ * @throws InterruptedException If interrupted.
*/
- private void consumerStream(final String topic, final Map<String, String> keyValueMap)
+ private void consumerStream(String topic, Map<String, String> keyValMap)
throws TimeoutException, InterruptedException {
-
KafkaStreamer<String, String, String> kafkaStmr = null;
- final Ignite ignite = grid();
- try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
+ Ignite ignite = grid();
+ try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(10);
- // Configure socket streamer.
+ // Configure Kafka streamer.
kafkaStmr = new KafkaStreamer<>();
// Get the cache.
IgniteCache<String, String> cache = ignite.cache(null);
- // Set ignite instance.
+ // Set Ignite instance.
kafkaStmr.setIgnite(ignite);
// Set data streamer instance.
@@ -167,58 +168,55 @@ public class KafkaIgniteStreamerSelfTest
kafkaStmr.setThreads(4);
// Set the consumer configuration.
- kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(),
- "groupX"));
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
// Set the decoders.
- StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties());
- kafkaStmr.setKeyDecoder(stringDecoder);
- kafkaStmr.setValueDecoder(stringDecoder);
+ StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
+
+ kafkaStmr.setKeyDecoder(strDecoder);
+ kafkaStmr.setValueDecoder(strDecoder);
// Start kafka streamer.
kafkaStmr.start();
final CountDownLatch latch = new CountDownLatch(CNT);
+
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
- @Override
- public boolean apply(UUID uuid, CacheEvent evt) {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
+
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
- latch.await();
- for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
- final String key = entry.getKey();
- final String value = entry.getValue();
+ latch.await();
- final String cacheValue = cache.get(key);
- assertEquals(value, cacheValue);
- }
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
}
-
finally {
- // Shutdown kafka streamer.
- kafkaStmr.stop();
+ if (kafkaStmr != null)
+ kafkaStmr.stop();
}
}
/**
* Creates default consumer config.
*
- * @param zooKeeper Zookeeper address <server:port>.
- * @param groupId Group Id for kafka subscriber.
- * @return {@link ConsumerConfig} kafka consumer configuration.
+ * @param zooKeeper ZooKeeper address <server:port>.
+ * @param grpId Group Id for kafka subscriber.
+ * @return Kafka consumer configuration.
*/
- private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) {
+ private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) {
A.notNull(zooKeeper, "zookeeper");
- A.notNull(groupId, "groupId");
+ A.notNull(grpId, "groupId");
Properties props = new Properties();
+
props.put("zookeeper.connect", zooKeeper);
- props.put("group.id", groupId);
+ props.put("group.id", grpId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
@@ -226,5 +224,4 @@ public class KafkaIgniteStreamerSelfTest
return new ConsumerConfig(props);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
index b836b44..1ef4f77 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
@@ -18,29 +18,36 @@
package org.apache.ignite.stream.kafka;
import kafka.producer.*;
+import kafka.utils.*;
/**
- * Simple Partitioner for Kafka.
+ * Simple partitioner for Kafka.
*/
@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner
- implements Partitioner {
+public class SimplePartitioner implements Partitioner {
+ /**
+ * Constructs instance.
+ *
+ * @param props Properties.
+ */
+ public SimplePartitioner(VerifiableProperties props) {
+ // No-op.
+ }
/**
* Partitions the key based on the key value.
*
* @param key Key.
- * @param partitionSize Partition size.
+ * @param partSize Partition size.
* @return partition Partition.
*/
- public int partition(Object key, int partitionSize) {
- int partition = 0;
+ public int partition(Object key, int partSize) {
String keyStr = (String)key;
+
String[] keyValues = keyStr.split("\\.");
+
Integer intKey = Integer.parseInt(keyValues[3]);
- if (intKey > 0) {
- partition = intKey % partitionSize;
- }
- return partition;
+
+ return intKey > 0 ? intKey % partSize : 0;
}
}