You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:41:43 UTC
[3/3] incubator-rocketmq-externals git commit: Release rocketmq-jms
1.0.0 version
Release rocketmq-jms 1.0.0 version
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/c4b20122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/c4b20122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/c4b20122
Branch: refs/heads/release-rocketmq-jms-1.0.0
Commit: c4b20122a5f75d48167220155efb56eccd308023
Parents:
Author: yukon <yu...@apache.org>
Authored: Thu Jun 15 10:40:04 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Thu Jun 15 10:41:16 2017 +0800
----------------------------------------------------------------------
.gitignore | 1 +
README.md | 32 ++
rocketmq-jms/.gitignore | 5 +
rocketmq-jms/.travis.yml | 43 ++
rocketmq-jms/README.md | 31 ++
rocketmq-jms/core/pom.xml | 48 ++
.../rocketmq/jms/domain/CommonConstant.java | 36 ++
.../rocketmq/jms/domain/CommonContext.java | 183 ++++++++
.../rocketmq/jms/domain/JmsBaseConnection.java | 172 ++++++++
.../jms/domain/JmsBaseConnectionFactory.java | 146 +++++++
.../jms/domain/JmsBaseConnectionMetaData.java | 134 ++++++
.../rocketmq/jms/domain/JmsBaseConstant.java | 86 ++++
.../jms/domain/JmsBaseMessageConsumer.java | 168 +++++++
.../jms/domain/JmsBaseMessageProducer.java | 281 ++++++++++++
.../rocketmq/jms/domain/JmsBaseSession.java | 308 +++++++++++++
.../rocketmq/jms/domain/JmsBaseTopic.java | 53 +++
.../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ++++++
.../jms/domain/message/JmsBaseMessage.java | 434 +++++++++++++++++++
.../jms/domain/message/JmsBytesMessage.java | 245 +++++++++++
.../jms/domain/message/JmsObjectMessage.java | 41 ++
.../jms/domain/message/JmsTextMessage.java | 48 ++
.../apache/rocketmq/jms/util/ExceptionUtil.java | 41 ++
.../rocketmq/jms/util/MessageConverter.java | 182 ++++++++
.../rocketmq/jms/util/MsgConvertUtil.java | 90 ++++
.../apache/rocketmq/jms/util/URISpecParser.java | 61 +++
.../core/src/main/resources/application.conf | 1 +
.../apache/rocketmq/jms/JmsTestListener.java | 67 +++
.../org/apache/rocketmq/jms/JmsTestUtil.java | 54 +++
.../jms/domain/message/JmsBytesMessageTest.java | 103 +++++
.../domain/message/JmsMessageConvertTest.java | 52 +++
.../domain/message/JmsObjectMessageTest.java | 92 ++++
.../jms/domain/message/JmsTextMessageTest.java | 50 +++
.../jms/integration/IntegrationTestBase.java | 199 +++++++++
.../rocketmq/jms/integration/JmsClientIT.java | 191 ++++++++
.../rocketmq/jms/integration/JmsConsumerIT.java | 131 ++++++
.../rocketmq/jms/util/URISpecParserTest.java | 43 ++
rocketmq-jms/pom.xml | 196 +++++++++
rocketmq-jms/spring/pom.xml | 82 ++++
.../SimpleExMessageListenerContainer.java | 90 ++++
.../rocketmq/jms/spring/JmsConsumeIT.java | 61 +++
.../rocketmq/jms/spring/JmsProduceIT.java | 93 ++++
.../rocketmq/jms/spring/SpringTestBase.java | 41 ++
.../spring/src/test/resources/consumer.xml | 51 +++
.../spring/src/test/resources/producer.xml | 43 ++
rocketmq-jms/style/copyright/Apache.xml | 24 +
.../style/copyright/profiles_settings.xml | 64 +++
rocketmq-jms/style/rmq_checkstyle.xml | 135 ++++++
rocketmq-jms/style/rmq_codeStyle.xml | 157 +++++++
48 files changed, 5017 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07a5fa6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+# RocketMQ Externals
+
+There are some RocketMQ external projects, with the purpose of growing the RocketMQ community.
+
+## RocketMQ-Console-Ng
+A console for RocketMQ
+
+## RocketMQ-JMS
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+## RocketMQ-Flume-Ng
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+ `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+
+## RocketMQ-Spark
+
+Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided.
+For more details please refer to rocketmq-spark README.md.
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore
new file mode 100644
index 0000000..d2e5aaf
--- /dev/null
+++ b/rocketmq-jms/.gitignore
@@ -0,0 +1,5 @@
+.idea/
+*.iml
+*.ipr
+*.iws
+target/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.travis.yml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml
new file mode 100644
index 0000000..9f430b2
--- /dev/null
+++ b/rocketmq-jms/.travis.yml
@@ -0,0 +1,43 @@
+notifications:
+ email:
+ recipients:
+ - zhangke.huangshan@gmail.com
+ - zhendongliu92@gmail.com
+ on_success: change
+ on_failure: always
+
+language: java
+
+matrix:
+ include:
+ # On OSX, run with default JDK only.
+ # - os: osx
+ # On Linux, run with specific JDKs only.
+ # - os: linux
+ # env: CUSTOM_JDK="oraclejdk8"
+ - os: linux
+ env: CUSTOM_JDK="oraclejdk7"
+ #- os: linux
+ # env: CUSTOM_JDK="openjdk7"
+
+before_install:
+ - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+ - cat ~/.mavenrc
+ - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+ - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+#os:
+# - linux
+# - osx
+#jdk:
+# - oraclejdk8
+# - oraclejdk7
+# - openjdk7
+
+
+script:
+ - travis_retry mvn -B clean install jacoco:report coveralls:report
+
+#after_success:
+# - mvn clean install
+# - mvn sonar:sonar
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md
new file mode 100644
index 0000000..a05e27e
--- /dev/null
+++ b/rocketmq-jms/README.md
@@ -0,0 +1,31 @@
+# RocketMQ-JMS [![Build Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms) [![Coverage Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
+
+
+## Introduction
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker.
+Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1".
+Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version.
+
+
+## Building
+
+ > cd rocketmq-jms
+ > mvn clean install
+
+ **run unit test:**
+ > mvn test
+
+ **run integration test:**
+ > mvn verify
+
+ **see jacoco code coverage report**
+ > open core/target/site/jacoco/index.html
+ > open core/target/site/jacoco-it/index.html
+ > open spring/target/site/jacoco-it/index.html
+
+
+## Guidelines
+
+ Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml
new file mode 100644
index 0000000..1b36e14
--- /dev/null
+++ b/rocketmq-jms/core/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-jms-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-jms</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
new file mode 100644
index 0000000..80a8b64
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface CommonConstant {
+
+ String PRODUCERID = "producerId";
+
+ String CONSUMERID = "consumerId";
+
+ String PROVIDER = "provider";
+
+ String NAMESERVER = "nameServer";
+
+ String INSTANCE_NAME = "instanceName";
+
+ String CONSUME_THREAD_NUMS = "consumeThreadNums";
+
+ String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
new file mode 100644
index 0000000..c8e4276
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class CommonContext {
+ private String accessKey;
+ private String secretKey;
+
+ private String consumerId;
+ private String producerId;
+
+ private String provider;
+
+ private String appId;
+
+ private String nameServer;
+
+ /**
+ * MQType
+ */
+ private String mqType;
+
+ /**
+ * Using for distinguishing client jvm process
+ */
+ private String instanceName;
+ /**
+ * Set consumer threadPool Size
+ */
+ private int consumeThreadNums;
+ /**
+ * Set send message timeOut
+ */
+ private int sendMsgTimeoutMillis = -1;
+
+ /**
+ * @return the appId
+ */
+ public String getAppId() {
+ return appId;
+ }
+
+ /**
+ * @param appId the appId to set
+ */
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ /**
+ * @return the provider
+ */
+ public String getProvider() {
+ return provider;
+ }
+
+ /**
+ * @param provider the provider to set
+ */
+ public void setProvider(String provider) {
+ this.provider = provider;
+ }
+
+ /**
+ * @return the instanceName
+ */
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ /**
+ * @param instanceName the instanceName to set
+ */
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ /**
+ * @return the accessKey
+ */
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ /**
+ * @param accessKey the accessKey to set
+ */
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ /**
+ * @return the secretKey
+ */
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ /**
+ * @param secretKey the secretKey to set
+ */
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ /**
+ * @return consumer thread nums
+ */
+ public int getConsumeThreadNums() {
+ return consumeThreadNums;
+ }
+
+ /**
+ * @param consumeThreadNums
+ */
+ public void setConsumeThreadNums(int consumeThreadNums) {
+ this.consumeThreadNums = consumeThreadNums;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public String getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(String producerId) {
+ this.producerId = producerId;
+ }
+
+ public int getSendMsgTimeoutMillis() {
+ return sendMsgTimeoutMillis;
+ }
+
+ public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
+ this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
+ }
+
+ public String getMqType() {
+ return mqType;
+ }
+
+ public void setMqType(String mqType) {
+ this.mqType = mqType;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
new file mode 100644
index 0000000..4c809c7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+
+public class JmsBaseConnection implements Connection {
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ protected String clientID;
+ protected ExceptionListener exceptionListener;
+ protected CommonContext context;
+ protected JmsBaseSession session;
+
+ public JmsBaseConnection(Map<String, String> connectionParams) {
+
+ this.clientID = UUID.randomUUID().toString();
+
+ context = new CommonContext();
+
+ //At lease one should be set
+ context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
+ context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
+
+ //optional
+ context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
+
+ String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
+ String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
+ String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
+ String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME);
+
+ if (StringUtils.isNotEmpty(nameServer)) {
+ context.setNameServer(nameServer);
+ }
+ if (StringUtils.isNotEmpty(instanceName)) {
+ context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
+ }
+
+ if (StringUtils.isNotEmpty(consumerThreadNums)) {
+ context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
+ }
+ if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
+ context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
+ }
+ }
+
+ @Override
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+
+ Preconditions.checkArgument(!transacted, "Not support transaction Session !");
+ Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode,
+ "Not support this acknowledge mode: " + acknowledgeMode);
+
+ if (null != this.session) {
+ return this.session;
+ }
+ synchronized (this) {
+ if (null != this.session) {
+ return this.session;
+ }
+ this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context);
+ if (isStarted()) {
+ this.session.start();
+ }
+ return this.session;
+ }
+ }
+
+ @Override
+ public String getClientID() throws JMSException {
+ return this.clientID;
+ }
+
+ @Override
+ public void setClientID(String clientID) throws JMSException {
+ this.clientID = clientID;
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() throws JMSException {
+ return new JmsBaseConnectionMetaData();
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() throws JMSException {
+ return this.exceptionListener;
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) throws JMSException {
+ this.exceptionListener = listener;
+ }
+
+ @Override
+ public void start() throws JMSException {
+ if (started.compareAndSet(false, true)) {
+ if (this.session != null) {
+ this.session.start();
+ }
+
+ }
+ }
+
+ @Override
+ public void stop() throws JMSException {
+ //Stop the connection before closing it.
+ //Do nothing here.
+ }
+
+ @Override
+ public void close() throws JMSException {
+ if (started.compareAndSet(true, false)) {
+ if (this.session != null) {
+ this.session.close();
+ }
+
+ }
+ }
+
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Destination destination,
+ String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+ String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Whether the connection is started.
+ *
+ * @return whether the connection is started.
+ */
+ public boolean isStarted() {
+ return started.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
new file mode 100644
index 0000000..1b9da06
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.util.URISpecParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseConnectionFactory implements ConnectionFactory {
+
+ private static Logger logger = LoggerFactory
+ .getLogger(JmsBaseConnectionFactory.class);
+ /**
+ * Synchronization monitor for the shared Connection
+ */
+ private final Object connectionMonitor = new Object();
+ /**
+ * Can be configured in a consistent way without too much URL hacking.
+ */
+ protected URI connectionUri;
+ /**
+ * Store connection uri query parameters.
+ */
+ protected Map<String, String> connectionParams;
+ /**
+ * Wrapped Connection
+ */
+ protected JmsBaseConnection connection;
+
+ public JmsBaseConnectionFactory() {
+
+ }
+
+ public JmsBaseConnectionFactory(URI connectionUri) {
+ setConnectionUri(connectionUri);
+ }
+
+ public void setConnectionUri(URI connectionUri) {
+ Preconditions.checkNotNull(connectionUri, "Please set URI !");
+ this.connectionUri = connectionUri;
+ this.connectionParams = URISpecParser.parseURI(connectionUri.toString());
+
+ if (null != connectionParams) {
+ Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) ||
+ null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !");
+ }
+
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException {
+ synchronized (this.connectionMonitor) {
+ if (this.connection == null) {
+ initConnection();
+ }
+ return this.connection;
+ }
+ }
+
+ /**
+ * Using userName and Password to create a connection
+ *
+ * @param userName ignored
+ * @param password ignored
+ * @return the new JMS Connection
+ * @throws JMSException
+ */
+ @Override
+ public Connection createConnection(String userName, String password) throws JMSException {
+ logger.debug("Using userName and Password to create a connection.");
+ return this.createConnection();
+ }
+
+ /**
+ * Initialize the underlying shared Connection.
+ * <p/>
+ * Closes and reInitializes the Connection if an underlying Connection is present already.
+ *
+ * @throws javax.jms.JMSException if thrown by JMS API methods
+ */
+ protected void initConnection() throws JMSException {
+ synchronized (this.connectionMonitor) {
+ if (this.connection != null) {
+ closeConnection(this.connection);
+ }
+ this.connection = doCreateConnection();
+ logger.debug("Established shared JMS Connection: {}", this.connection);
+ }
+ }
+
+ /**
+ * Close the given Connection.
+ *
+ * @param con the Connection to close
+ */
+ protected void closeConnection(Connection con) {
+ logger.debug("Closing shared JMS Connection: {}", this.connection);
+ try {
+ try {
+ con.stop();
+ }
+ finally {
+ con.close();
+ }
+ }
+ catch (Throwable ex) {
+ logger.error("Could not close shared JMS Connection.", ex);
+ }
+ }
+
+ /**
+ * Create a JMS Connection
+ *
+ * @return the new JMS Connection
+ * @throws javax.jms.JMSException if thrown by JMS API methods
+ */
+ protected JmsBaseConnection doCreateConnection() throws JMSException {
+ Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0,
+ "Connection Parameters can not be null!");
+ this.connection = new JmsBaseConnection(this.connectionParams);
+
+ return connection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
new file mode 100644
index 0000000..ee549aa
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class JmsBaseConnectionMetaData implements ConnectionMetaData {
+ public static final String JMS_VERSION;
+ public static final int JMS_MAJOR_VERSION;
+ public static final int JMS_MINOR_VERSION;
+
+ public static final String PROVIDER_VERSION;
+ public static final int PROVIDER_MAJOR_VERSION;
+ public static final int PROVIDER_MINOR_VERSION;
+
+ public static final String PROVIDER_NAME = "Apache RocketMQ";
+
+ public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData();
+
+ public static InputStream resourceStream;
+
+ static {
+ Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+ String jmsVersion = null;
+ int jmsMajor = 0;
+ int jmsMinor = 0;
+ try {
+ Package p = Package.getPackage("javax.jms");
+ if (p != null) {
+ jmsVersion = p.getImplementationVersion();
+ Matcher m = pattern.matcher(jmsVersion);
+ if (m.matches()) {
+ jmsMajor = Integer.parseInt(m.group(1));
+ jmsMinor = Integer.parseInt(m.group(2));
+ }
+ }
+ }
+ catch (Throwable e) {
+ }
+ JMS_VERSION = jmsVersion;
+ JMS_MAJOR_VERSION = jmsMajor;
+ JMS_MINOR_VERSION = jmsMinor;
+
+ String providerVersion = null;
+ int providerMajor = 0;
+ int providerMinor = 0;
+ Properties properties = new Properties();
+ try {
+ resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
+ properties.load(resourceStream);
+ providerVersion = properties.getProperty("version");
+
+ Matcher m = pattern.matcher(providerVersion);
+ if (m.matches()) {
+ providerMajor = Integer.parseInt(m.group(1));
+ providerMinor = Integer.parseInt(m.group(2));
+ }
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ PROVIDER_VERSION = providerVersion;
+ PROVIDER_MAJOR_VERSION = providerMajor;
+ PROVIDER_MINOR_VERSION = providerMinor;
+
+ }
+
+ public String getJMSVersion() throws JMSException {
+ return JMS_VERSION;
+ }
+
+ public int getJMSMajorVersion() throws JMSException {
+ return JMS_MAJOR_VERSION;
+ }
+
+ public int getJMSMinorVersion() throws JMSException {
+ return JMS_MINOR_VERSION;
+ }
+
+ public String getJMSProviderName() throws JMSException {
+ return PROVIDER_NAME;
+ }
+
+ public String getProviderVersion() throws JMSException {
+ return PROVIDER_VERSION;
+ }
+
+ public int getProviderMajorVersion() throws JMSException {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ public int getProviderMinorVersion() throws JMSException {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+ Vector<String> jmxProperties = new Vector<String>();
+ jmxProperties.add("jmsXUserId");
+ jmxProperties.add("jmsXAppId");
+ jmxProperties.add("jmsXGroupID");
+ jmxProperties.add("jmsXGroupSeq");
+ jmxProperties.add("jmsXState");
+ jmxProperties.add("jmsXDeliveryCount");
+ jmxProperties.add("jmsXProducerTXID");
+ jmxProperties.add("jmsConsumerTXID");
+ jmxProperties.add("jmsRecvTimeStamp");
+ return jmxProperties.elements();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
new file mode 100644
index 0000000..f0bca28
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface JmsBaseConstant {
+ //------------------------JMS message header constant---------------------------------
+ String JMS_DESTINATION = "jmsDestination";
+ String JMS_DELIVERY_MODE = "jmsDeliveryMode";
+ String JMS_EXPIRATION = "jmsExpiration";
+ String JMS_DELIVERY_TIME = "jmsDeliveryTime";
+ String JMS_PRIORITY = "jmsPriority";
+ String JMS_MESSAGE_ID = "jmsMessageID";
+ String JMS_TIMESTAMP = "jmsTimestamp";
+ String JMS_CORRELATION_ID = "jmsCorrelationID";
+ String JMS_REPLY_TO = "jmsReplyTo";
+ String JMS_TYPE = "jmsType";
+ String JMS_REDELIVERED = "jmsRedelivered";
+
+ //-------------------------JMS defined properties constant----------------------------
+ /**
+ * The identity of the user sending the Send message
+ */
+ String JMS_XUSER_ID = "jmsXUserID";
+ /**
+ * The identity of the application Send sending the message
+ */
+ String JMS_XAPP_ID = "jmsXAppID";
+ /**
+ * The number of message delivery Receive attempts
+ */
+ String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
+ /**
+ * The identity of the message group this message is part of
+ */
+ String JMS_XGROUP_ID = "jmsXGroupID";
+ /**
+ * The sequence number of this message within the group; the first message is 1, the second 2,...
+ */
+ String JMS_XGROUP_SEQ = "jmsXGroupSeq";
+ /**
+ * The transaction identifier of the Send transaction within which this message was produced
+ */
+ String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
+ /**
+ * The transaction identifier of the Receive transaction within which this message was consumed
+ */
+ String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
+
+ /**
+ * The time JMS delivered the Receive message to the consumer
+ */
+ String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
+ /**
+ * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and
+ * that these copies exist from the time the original message was sent. Each copy’s state is one of: 1(waiting),
+ * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided
+ * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this.
+ */
+ String JMS_XSTATE = "jmsXState";
+
+ //---------------------------JMS Headers' value constant---------------------------
+ /**
+ * Default time to live
+ */
+ long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
+
+ /**
+ * Default Jms Type
+ */
+ String DEFAULT_JMS_TYPE = "rocketmq";
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
new file mode 100644
index 0000000..b62e928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessageConsumer implements MessageConsumer {
+
+ private static final Object LOCK_OBJECT = new Object();
+ //all shared consumers
+ private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private CommonContext context;
+ private Destination destination;
+ private MessageListener messageListener;
+
+ public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
+ JmsBaseConnection connection) throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ checkArgs(destination, commonContext);
+
+ if (null == consumerMap.get(context.getConsumerId())) {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
+ if (context.getConsumeThreadNums() > 0) {
+ consumer.setConsumeThreadMax(context.getConsumeThreadNums());
+ consumer.setConsumeThreadMin(context.getConsumeThreadNums());
+ }
+ if (!Strings.isNullOrEmpty(context.getNameServer())) {
+ consumer.setNamesrvAddr(context.getNameServer());
+ }
+ if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+ consumer.setInstanceName(context.getInstanceName());
+ }
+ consumer.setConsumeMessageBatchMaxSize(1);
+ //add subscribe?
+ RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
+ consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
+ }
+
+ consumerMap.get(context.getConsumerId()).incrementAndGet();
+
+ //If the connection has been started, start the consumer right now.
+ //add start status?
+ RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
+ if (connection.isStarted()) {
+ try {
+ consumerExt.start();
+ }
+ catch (MQClientException mqe) {
+ JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
+ jmsException.initCause(mqe);
+ throw jmsException;
+ }
+ }
+ }
+
+ }
+
+ private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+ Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!");
+ Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+ this.context = context;
+ this.destination = destination;
+ }
+
+ @Override
+ public String getMessageSelector() throws JMSException {
+ return null;
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ return this.messageListener;
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt) {
+ try {
+ this.messageListener = listener;
+ String messageTopic = ((JmsBaseTopic) destination).getMessageTopic();
+ String messageType = ((JmsBaseTopic) destination).getMessageType();
+ rocketmqConsumerExt.subscribe(messageTopic, messageType, listener);
+ }
+ catch (MQClientException mqe) {
+ //add what?
+ throw new JMSException(mqe.getMessage());
+ }
+
+ }
+
+ }
+
+ @Override
+ public Message receive() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public Message receive(long timeout) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public void close() throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ if (closed.compareAndSet(false, true)) {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) {
+ rocketmqConsumerExt.close();
+ consumerMap.remove(context.getConsumerId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Start the consumer to get message from the Broker.
+ */
+ public void startConsumer() throws JMSException {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt) {
+ try {
+ rocketmqConsumerExt.start();
+ }
+ catch (MQClientException mqe) {
+ throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed");
+ }
+ }
+ }
+
+ public Destination getDestination() throws JMSException {
+ return this.destination;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
new file mode 100644
index 0000000..8dd82f0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseMessageProducer implements MessageProducer {
+
+ private static final Object LOCK_OBJECT = new Object();
+ private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap();
+ private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class);
+ private CommonContext context;
+
+ private Destination destination;
+
+ public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ checkArgs(destination, context);
+
+ if (null == producerMap.get(this.context.getProducerId())) {
+ DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId());
+ if (!Strings.isNullOrEmpty(context.getNameServer())) {
+ producer.setNamesrvAddr(context.getNameServer());
+ }
+ if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+ producer.setInstanceName(context.getInstanceName());
+ }
+ if (context.getSendMsgTimeoutMillis() > 0) {
+ producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
+ }
+ try {
+ producer.start();
+ }
+ catch (MQClientException mqe) {
+ throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId()));
+ }
+ producerMap.putIfAbsent(this.context.getProducerId(), producer);
+ }
+
+ }
+ }
+
+ private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+ Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!");
+ Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+ this.context = context;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean getDisableMessageID() throws JMSException {
+ return false;
+ }
+
+ @Override
+ public void setDisableMessageID(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ return false;
+ }
+
+ @Override
+ public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public int getDeliveryMode() throws JMSException {
+ return javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ @Override
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public int getPriority() throws JMSException {
+ return javax.jms.Message.DEFAULT_PRIORITY;
+ }
+
+ @Override
+ public void setPriority(int defaultPriority) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public long getTimeToLive() throws JMSException {
+ return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
+ }
+
+ @Override
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public Destination getDestination() throws JMSException {
+ return this.destination;
+ }
+
+ @Override
+ public void close() throws JMSException {
+ //Nothing to do
+ }
+
+ @Override
+ public void send(javax.jms.Message message) throws JMSException {
+ this.send(getDestination(), message);
+ }
+
+ /**
+ * Send the message to the defined Destination success---return normally. Exception---throw out JMSException.
+ *
+ * @param destination see <CODE>Destination</CODE>
+ * @param message the message to be sent.
+ * @throws javax.jms.JMSException
+ */
+ @Override
+ public void send(Destination destination, javax.jms.Message message) throws JMSException {
+ JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
+ initJMSHeaders(jmsMsg, destination);
+
+ try {
+ if (context == null) {
+ throw new IllegalStateException("Context should be inited");
+ }
+ org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg);
+
+ MQProducer producer = producerMap.get(context.getProducerId());
+
+ if (producer == null) {
+ throw new Exception("producer is null ");
+ }
+ SendResult sendResult = producer.send(rocketmqMsg);
+ if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId());
+ } else {
+ throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString()));
+ }
+ }
+ catch (Exception e) {
+ logger.error("Send rocketmq message failure !", e);
+ //if fail to send the message, throw out JMSException
+ JMSException jmsException = new JMSException("Send rocketmq message failure!");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ @Override
+ public void send(javax.jms.Message message, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void send(Destination destination, javax.jms.Message message, int deliveryMode,
+ int priority, long timeToLive) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Init the JmsMessage Headers.
+ * <p/>
+ * <P>JMS providers init message's headers. Do not allow user to set these by yourself.
+ *
+ * @param jmsMsg message
+ * @param destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException {
+
+ //JMS_DESTINATION default:"topic:message"
+ jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
+ //JMS_DELIVERY_MODE default : PERSISTENT
+ jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE);
+ //JMS_TIMESTAMP default : current time
+ jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis());
+ //JMS_EXPIRATION default : 3 days
+ //JMS_EXPIRATION = currentTime + time_to_live
+ jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
+ //JMS_PRIORITY default : 4
+ jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
+ //JMS_TYPE default : open notification service
+ jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE);
+ //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
+ //JMS_MESSAGE_ID is set by sendResult.
+ //JMS_REDELIVERED is set by broker.
+ }
+
+ /**
+ * Init the OnsMessage Headers.
+ * <p/>
+ * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the
+ * OnsMessage's Properties.
+ *
+ * @param jmsMsg message
+ * @throws javax.jms.JMSException
+ */
+ public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
+ String topic, String messageType) throws JMSException {
+ Properties userProperties = new Properties();
+
+ //Jms userProperties to properties
+ Map<String, Object> userProps = jmsMsg.getProperties();
+ Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator();
+ while (userPropsIter.hasNext()) {
+ Map.Entry<String, Object> entry = userPropsIter.next();
+ userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+ }
+ //Jms systemProperties to ROCKETMQ properties
+ Map<String, Object> sysProps = jmsMsg.getHeaders();
+ Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator();
+ while (sysPropsIter.hasNext()) {
+ Map.Entry<String, Object> entry = sysPropsIter.next();
+ userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+ }
+
+ //Jms message Model
+ if (jmsMsg instanceof JmsBytesMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES);
+ }
+ else if (jmsMsg instanceof JmsObjectMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ);
+ }
+ else if (jmsMsg instanceof JmsTextMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+ }
+
+ //message topic and tag
+ userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
+ userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
+
+ return userProperties;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
new file mode 100644
index 0000000..5bf7005
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseSession implements Session {
+ protected CommonContext context;
+ protected JmsBaseConnection connection;
+ protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
+ new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
+ private boolean transacted = true;
+ private int acknowledgeMode = AUTO_ACKNOWLEDGE;
+
+ public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
+ int acknowledgeMode, CommonContext context) {
+ this.context = context;
+ this.acknowledgeMode = acknowledgeMode;
+ this.transacted = transacted;
+ this.connection = connection;
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() throws JMSException {
+ return new JmsBytesMessage();
+ }
+
+ @Override
+ public MapMessage createMapMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Message createMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() throws JMSException {
+ return new JmsObjectMessage();
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ return new JmsObjectMessage(object);
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public TextMessage createTextMessage() throws JMSException {
+ return new JmsTextMessage();
+ }
+
+ @Override
+ public TextMessage createTextMessage(String text) throws JMSException {
+ return new JmsTextMessage(text);
+ }
+
+ @Override
+ public boolean getTransacted() throws JMSException {
+ return this.transacted;
+ }
+
+ @Override
+ public int getAcknowledgeMode() {
+ return this.acknowledgeMode;
+ }
+
+ @Override
+ public void commit() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void rollback() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void close() throws JMSException {
+ for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+ messageConsumer.close();
+ }
+ consumerList.clear();
+ }
+
+ @Override
+ public void recover() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ return null;
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public void run() {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ return new JmsBaseMessageProducer(destination, context);
+ }
+
+ /**
+ * Create a MessageConsumer.
+ * <p/>
+ * <P>Create a durable consumer to the specified destination
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ JmsBaseMessageConsumer messageConsumer = new
+ JmsBaseMessageConsumer(destination, this.context, this.connection);
+ this.consumerList.addIfAbsent(messageConsumer);
+ return messageConsumer;
+ }
+
+ /**
+ * Create a MessageConsumer with messageSelector.
+ * <p/>
+ * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @param messageSelector For filtering messages
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector)
+ throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+
+ }
+
+ /**
+ * Create a MessageConsumer with messageSelector.
+ * <p/>
+ * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject
+ * messages from localhost.
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @param messageSelector For filtering messages
+ * @param noLocal If true: reject messages from localhost
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector,
+ boolean noLocal) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Queue createQueue(String queueName) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Topic createTopic(String topicName) throws JMSException {
+ Preconditions.checkNotNull(topicName);
+ List<String> msgTuple = Arrays.asList(topicName.split(":"));
+
+ Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
+ "Destination must match messageTopic:messageType !");
+
+ //If messageType is null, use * instead.
+ if (1 == msgTuple.size()) {
+ return new JmsBaseTopic(msgTuple.get(0), "*");
+ }
+ return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
+ }
+
+ /**
+ * Create a MessageConsumer with durable subscription.
+ * <p/>
+ * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+ * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+ *
+ * @param topic destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Topic</CODE>
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name)
+ throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Create a MessageConsumer with durable subscription.
+ * <p/>
+ * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+ * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+ *
+ * @param topic destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Topic</CODE>
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name,
+ String messageSelector,
+ boolean noLocal) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ return new TemporaryQueue() {
+ public void delete() throws JMSException {
+ }
+
+ public String getQueueName() throws JMSException {
+ return UUID.randomUUID().toString();
+ }
+ };
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ return new TemporaryTopic() {
+ public void delete() throws JMSException {
+ }
+
+ public String getTopicName() throws JMSException {
+ return UUID.randomUUID().toString();
+ }
+ };
+ }
+
+ @Override
+ public void unsubscribe(String name) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ public void start() throws JMSException {
+ for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+ messageConsumer.startConsumer();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
new file mode 100644
index 0000000..b7e2fab
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class JmsBaseTopic implements Topic {
+
+ private String messageTopic;
+ private String messageType;
+
+ public JmsBaseTopic(String messageTopic, String messageType) {
+ Preconditions.checkNotNull(messageTopic);
+ Preconditions.checkNotNull(messageType);
+
+ this.messageTopic = messageTopic;
+ this.messageType = messageType;
+ }
+
+ public String getTopicName() throws JMSException {
+ return this.toString();
+ }
+
+ public String toString() {
+ return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType());
+ }
+
+ public String getMessageTopic() {
+ return messageTopic;
+ }
+
+ public String getMessageType() {
+ return messageType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
new file mode 100644
index 0000000..7a8a9f7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.util.MessageConverter;
+
+public class RMQPushConsumerExt {
+ private final MQPushConsumer consumer;
+ private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>();
+
+ private AtomicInteger referenceCount = new AtomicInteger(0);
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ public RMQPushConsumerExt(MQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public MQPushConsumer getConsumer() {
+ return consumer;
+ }
+
+ public int incrementAndGet() {
+ return referenceCount.incrementAndGet();
+ }
+
+ public int decrementAndGet() {
+ return referenceCount.decrementAndGet();
+ }
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
+ public void start() throws MQClientException {
+ if (consumer == null) {
+ throw new MQClientException(-1, "consumer is null");
+ }
+
+ if (this.started.compareAndSet(false, true)) {
+ this.consumer.registerMessageListener(new MessageListenerImpl());
+ this.consumer.start();
+ }
+ }
+
+
+ public void close() {
+ if (this.started.compareAndSet(true, false)) {
+ this.consumer.shutdown();
+ }
+ }
+
+ public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException {
+ if (null == topic) {
+ throw new MQClientException(-1, "topic is null");
+ }
+
+ if (null == listener) {
+ throw new MQClientException(-1, "listener is null");
+ }
+
+ try {
+ this.subscribeTable.put(topic, listener);
+ this.consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ throw new MQClientException("consumer subscribe exception", e);
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ if (null != topic) {
+ this.consumer.unsubscribe(topic);
+ }
+ }
+
+ class MessageListenerImpl implements MessageListenerConcurrently {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
+ MessageExt msgRMQ = msgsRMQList.get(0);
+ javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
+ if (null == listener) {
+ throw new RuntimeException("MessageListener is null");
+ }
+
+ try {
+ listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
+ }
+ catch (Exception e) {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+
+ public boolean isClosed() {
+ return !isStarted();
+ }
+}