You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/26 17:38:26 UTC
[17/51] [abbrv] ignite git commit: IGNITE-4539 - Added RocketMQ
integration
IGNITE-4539 - Added RocketMQ integration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/335f2431
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/335f2431
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/335f2431
Branch: refs/heads/ignite-5075-cacheStart
Commit: 335f24317da9f0083834bb9e34622b85d282f42b
Parents: ee1b19d
Author: Roman Shtykh <rs...@yahoo.com>
Authored: Wed Apr 26 14:24:20 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 26 14:25:12 2017 +0300
----------------------------------------------------------------------
modules/rocketmq/README.txt | 25 +++
modules/rocketmq/pom.xml | 81 +++++++
.../stream/rocketmq/RocketMQStreamer.java | 151 +++++++++++++
.../ignite/stream/rocketmq/package-info.java | 21 ++
.../stream/rocketmq/RocketMQStreamerTest.java | 214 +++++++++++++++++++
.../rocketmq/RocketMQStreamerTestSuite.java | 37 ++++
.../stream/rocketmq/TestRocketMQServer.java | 148 +++++++++++++
.../ignite/stream/rocketmq/package-info.java | 21 ++
parent/pom.xml | 5 +
pom.xml | 1 +
10 files changed, 704 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/README.txt
----------------------------------------------------------------------
diff --git a/modules/rocketmq/README.txt b/modules/rocketmq/README.txt
new file mode 100644
index 0000000..55a117b
--- /dev/null
+++ b/modules/rocketmq/README.txt
@@ -0,0 +1,25 @@
+Apache Ignite RocketMQ Streamer Module
+--------------------------------------
+
+Apache Ignite RocketMQ Streamer module provides streaming from RocketMQ to Ignite cache.
+
+To use Ignite RocketMQ Streamer module, first import it to your Maven project.
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ ...
+ <dependencies>
+ ...
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rocketmq</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+ ...
+ </dependencies>
+ ...
+</project>
+
+Then, initialize and start it as, for instance, done in RocketMQStreamerTest.java.
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/modules/rocketmq/pom.xml b/modules/rocketmq/pom.xml
new file mode 100644
index 0000000..3b317fa
--- /dev/null
+++ b/modules/rocketmq/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-rocketmq</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ <version>${rocketmq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
new file mode 100644
index 0000000..67f1ce5
--- /dev/null
+++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/RocketMQStreamer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.List;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * Streamer that subscribes to a RocketMQ topic amd feeds messages into {@link IgniteDataStreamer} instance.
+ */
+public class RocketMQStreamer<K, V> extends StreamAdapter<List<MessageExt>, K, V> implements MessageListenerConcurrently {
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** RocketMQ consumer. */
+ private DefaultMQPushConsumer consumer;
+
+ /** State. */
+ private volatile boolean stopped = true;
+
+ /** Topic to subscribe to. */
+ private String topic;
+
+ /** Consumer group. */
+ private String consumerGrp;
+
+ /** Name server address. */
+ private String nameSrvAddr;
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ if (!stopped)
+ throw new IgniteException("Attempted to start an already started RocketMQ streamer");
+
+ // validate parameters.
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.notNull(topic, "topic");
+ A.notNull(consumerGrp, "consumer group");
+ A.notNullOrEmpty(nameSrvAddr, "nameserver address");
+ A.ensure(null != getMultipleTupleExtractor(), "Multiple tuple extractor must be configured");
+
+ log = getIgnite().log();
+
+ consumer = new DefaultMQPushConsumer(consumerGrp);
+
+ consumer.setNamesrvAddr(nameSrvAddr);
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+ try {
+ consumer.subscribe(topic, "*");
+ }
+ catch (MQClientException e) {
+ throw new IgniteException("Failed to subscribe to " + topic, e);
+ }
+
+ consumer.registerMessageListener(this);
+
+ try {
+ consumer.start();
+ }
+ catch (MQClientException e) {
+ throw new IgniteException("Failed to start the streamer", e);
+ }
+
+ stopped = false;
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ if (consumer != null)
+ consumer.shutdown();
+
+ stopped = true;
+ }
+
+ /**
+ * Implements {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)} to receive
+ * messages.
+ *
+ * {@inheritDoc}
+ */
+ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ if (log.isDebugEnabled())
+ log.debug("Received " + msgs.size() + " messages");
+
+ addMessage(msgs);
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ /**
+ * Sets the topic to subscribe to.
+ *
+ * @param topic The topic to subscribe to.
+ */
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Sets the name of the consumer group.
+ *
+ * @param consumerGrp Consumer group name.
+ */
+ public void setConsumerGrp(String consumerGrp) {
+ this.consumerGrp = consumerGrp;
+ }
+
+ /**
+ * Sets the name server address.
+ *
+ * @param nameSrvAddr Name server address
+ */
+ public void setNameSrvAddr(String nameSrvAddr) {
+ this.nameSrvAddr = nameSrvAddr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
new file mode 100644
index 0000000..f743696
--- /dev/null
+++ b/modules/rocketmq/src/main/java/org/apache/ignite/stream/rocketmq/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of RocketMQStreamer tests.
+ */
+package org.apache.ignite.stream.rocketmq;
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
new file mode 100644
index 0000000..59451e9
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT;
+import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP;
+
+/**
+ * Test for {@link RocketMQStreamer}.
+ */
+public class RocketMQStreamerTest extends GridCommonAbstractTest {
+ /** Test topic. */
+ private static final String TOPIC_NAME = "testTopic";
+
+ /** Test consumer group. */
+ private static final String CONSUMER_GRP = "testConsumerGrp";
+
+ /** Test server. */
+ private static TestRocketMQServer testRocketMQServer;
+
+ /** Number of events to handle. */
+ private static final int EVT_NUM = 1000;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid().cache(null).clear();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTestsStarted() throws Exception {
+ testRocketMQServer = new TestRocketMQServer(log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ if (testRocketMQServer != null)
+ testRocketMQServer.shutdown();
+ }
+
+ /** Constructor. */
+ public RocketMQStreamerTest() {
+ super(true);
+ }
+
+ /**
+ * Tests data is properly injected into the grid.
+ *
+ * @throws Exception If fails.
+ */
+ public void testStreamer() throws Exception {
+ RocketMQStreamer<String, byte[]> streamer = null;
+
+ Ignite ignite = grid();
+
+ try (IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(null)) {
+ dataStreamer.allowOverwrite(true);
+ dataStreamer.autoFlushFrequency(10);
+
+ streamer = new RocketMQStreamer<>();
+
+ //configure.
+ streamer.setIgnite(ignite);
+ streamer.setStreamer(dataStreamer);
+ streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+ streamer.setConsumerGrp(CONSUMER_GRP);
+ streamer.setTopic(TOPIC_NAME);
+ streamer.setMultipleTupleExtractor(new TestTupleExtractor());
+
+ streamer.start();
+
+ IgniteCache<String, String> cache = ignite.cache(null);
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ final CountDownLatch latch = new CountDownLatch(EVT_NUM);
+
+ IgniteBiPredicate<UUID, CacheEvent> putLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(putLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+ produceData();
+
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+ assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY));
+ }
+ finally {
+ if (streamer != null)
+ streamer.stop();
+ }
+ }
+
+ /**
+ * Test tuple extractor.
+ */
+ public static class TestTupleExtractor implements StreamMultipleTupleExtractor<List<MessageExt>, String, byte[]> {
+
+ /** {@inheritDoc} */
+ @Override public Map<String, byte[]> extract(List<MessageExt> msgs) {
+ final Map<String, byte[]> map = new HashMap<>();
+
+ for (MessageExt msg : msgs)
+ map.put(msg.getMsgId(), msg.getBody());
+
+ return map;
+ }
+ }
+
+ /**
+ * Adds data to RocketMQ.
+ *
+ * @throws Exception If fails.
+ */
+ private void produceData() throws Exception {
+ initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT);
+
+ DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp");
+
+ producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+
+ try {
+ producer.start();
+
+ for (int i = 0; i < EVT_NUM; i++)
+ producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8")));
+ }
+ catch (Exception e) {
+ throw new Exception(e);
+ }
+ finally {
+ producer.shutdown();
+ }
+ }
+
+ /**
+ * Initializes RocketMQ topic.
+ *
+ * @param topic Topic.
+ * @param nsAddr Nameserver address.
+ * @throws IgniteInterruptedCheckedException If fails.
+ */
+ private void initTopic(String topic, String nsAddr) throws Exception {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExt.setNamesrvAddr(nsAddr);
+ try {
+ defaultMQAdminExt.start();
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setReadQueueNums(4);
+ topicConfig.setWriteQueueNums(4);
+
+ defaultMQAdminExt.createAndUpdateTopicConfig(testRocketMQServer.getBrokerAddr(), topicConfig);
+
+ U.sleep(100);
+ }
+ finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
new file mode 100644
index 0000000..e761f1b
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/RocketMQStreamerTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import junit.framework.TestSuite;
+
+/**
+ * Apache RocketMQ streamers tests.
+ */
+public class RocketMQStreamerTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
+
+ suite.addTest(new TestSuite(RocketMQStreamerTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
new file mode 100644
index 0000000..beece8e
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/TestRocketMQServer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.rocketmq;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import static java.io.File.separator;
+
+/**
+ * Test RocketMQ server handling a broker and a nameserver.
+ */
+class TestRocketMQServer {
+ /** Nameserver port. */
+ protected static final int NAME_SERVER_PORT = 9000;
+
+ /** Broker port. */
+ private static final int BROKER_PORT = 8000;
+
+ /** Broker HA port. */
+ private static final int HA_PORT = 8001;
+
+ /** Test ip address. */
+ protected static final String TEST_IP = "127.0.0.1";
+
+ /** Test broker name. */
+ private static final String TEST_BROKER = "testBroker";
+
+ /** Test cluster name. */
+ private static final String TEST_CLUSTER = "testCluster";
+
+ /** Nameserver. */
+ private static NamesrvController nameSrv;
+
+ /** Broker. */
+ private static BrokerController broker;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /**
+ * Test server constructor.
+ *
+ * @param log Logger.
+ */
+ TestRocketMQServer(IgniteLogger log) {
+ this.log = log;
+
+ try {
+ startNameServer();
+ startBroker();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to start RocketMQ: " + e);
+ }
+ }
+
+ /**
+ * Starts a test nameserver.
+ *
+ * @throws Exception If fails.
+ */
+ private void startNameServer() throws Exception {
+ NamesrvConfig namesrvConfig = new NamesrvConfig();
+ NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+
+ namesrvConfig.setKvConfigPath(System.getProperty("java.io.tmpdir") + separator + "namesrv" + separator + "kvConfig.json");
+ nameServerNettyServerConfig.setListenPort(NAME_SERVER_PORT);
+
+ nameSrv = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+
+ nameSrv.initialize();
+ nameSrv.start();
+
+ log.info("Started nameserver at " + NAME_SERVER_PORT);
+ }
+
+ /**
+ * Starts a test broker.
+ *
+ * @throws Exception If fails.
+ */
+ private void startBroker() throws Exception {
+ BrokerConfig brokerCfg = new BrokerConfig();
+ NettyServerConfig nettySrvCfg = new NettyServerConfig();
+ MessageStoreConfig storeCfg = new MessageStoreConfig();
+
+ brokerCfg.setBrokerName(TEST_BROKER);
+ brokerCfg.setBrokerClusterName(TEST_CLUSTER);
+ brokerCfg.setBrokerIP1(TEST_IP);
+ brokerCfg.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
+
+ storeCfg.setStorePathRootDir(System.getProperty("java.io.tmpdir") + separator + "store-" + UUID.randomUUID());
+ storeCfg.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + separator + "commitlog");
+ storeCfg.setHaListenPort(HA_PORT);
+
+ nettySrvCfg.setListenPort(BROKER_PORT);
+
+ broker = new BrokerController(brokerCfg, nettySrvCfg, new NettyClientConfig(), storeCfg);
+
+ broker.initialize();
+ broker.start();
+
+ log.info("Started broker [" + TEST_BROKER + "] at " + BROKER_PORT);
+ }
+
+ /**
+ * Obtains the broker address.
+ *
+ * @return Broker address.
+ */
+ String getBrokerAddr() {
+ return broker.getBrokerAddr();
+ }
+
+ /**
+ * Shuts test server down.
+ */
+ void shutdown() {
+ if (broker != null)
+ broker.shutdown();
+
+ if (nameSrv != null)
+ nameSrv.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
----------------------------------------------------------------------
diff --git a/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
new file mode 100644
index 0000000..eebf084
--- /dev/null
+++ b/modules/rocketmq/src/test/java/org/apache/ignite/stream/rocketmq/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementation of RocketMQStreamer.
+ */
+package org.apache.ignite.stream.rocketmq;
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3d0f413..a59d8c9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -94,6 +94,7 @@
<osgi.core.version>5.0.0</osgi.core.version>
<osgi.enterprise.version>5.0.0</osgi.enterprise.version>
<paho.version>1.0.2</paho.version>
+ <rocketmq.version>4.0.0-incubating</rocketmq.version>
<scala210.jline.version>2.10.4</scala210.jline.version>
<scala210.library.version>2.10.4</scala210.library.version>
<scala211.library.version>2.11.7</scala211.library.version>
@@ -449,6 +450,10 @@
<title>SpringData integration</title>
<packages>org.apache.ignite.springdata.repository*</packages>
</group>
+ <group>
+ <title>RocketMQ integration</title>
+ <packages>org.apache.ignite.rocketmq*</packages>
+ </group>
</groups>
<header>
<![CDATA[
http://git-wip-us.apache.org/repos/asf/ignite/blob/335f2431/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a05342..9c6a0d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
<module>modules/kubernetes</module>
<module>modules/zeromq</module>
<module>modules/hibernate-core</module>
+ <module>modules/rocketmq</module>
</modules>
<profiles>