You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:41:43 UTC

[3/3] incubator-rocketmq-externals git commit: Release rocketmq-jms 1.0.0 version

Release rocketmq-jms 1.0.0 version


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/c4b20122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/c4b20122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/c4b20122

Branch: refs/heads/release-rocketmq-jms-1.0.0
Commit: c4b20122a5f75d48167220155efb56eccd308023
Parents: 
Author: yukon <yu...@apache.org>
Authored: Thu Jun 15 10:40:04 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Thu Jun 15 10:41:16 2017 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 README.md                                       |  32 ++
 rocketmq-jms/.gitignore                         |   5 +
 rocketmq-jms/.travis.yml                        |  43 ++
 rocketmq-jms/README.md                          |  31 ++
 rocketmq-jms/core/pom.xml                       |  48 ++
 .../rocketmq/jms/domain/CommonConstant.java     |  36 ++
 .../rocketmq/jms/domain/CommonContext.java      | 183 ++++++++
 .../rocketmq/jms/domain/JmsBaseConnection.java  | 172 ++++++++
 .../jms/domain/JmsBaseConnectionFactory.java    | 146 +++++++
 .../jms/domain/JmsBaseConnectionMetaData.java   | 134 ++++++
 .../rocketmq/jms/domain/JmsBaseConstant.java    |  86 ++++
 .../jms/domain/JmsBaseMessageConsumer.java      | 168 +++++++
 .../jms/domain/JmsBaseMessageProducer.java      | 281 ++++++++++++
 .../rocketmq/jms/domain/JmsBaseSession.java     | 308 +++++++++++++
 .../rocketmq/jms/domain/JmsBaseTopic.java       |  53 +++
 .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ++++++
 .../jms/domain/message/JmsBaseMessage.java      | 434 +++++++++++++++++++
 .../jms/domain/message/JmsBytesMessage.java     | 245 +++++++++++
 .../jms/domain/message/JmsObjectMessage.java    |  41 ++
 .../jms/domain/message/JmsTextMessage.java      |  48 ++
 .../apache/rocketmq/jms/util/ExceptionUtil.java |  41 ++
 .../rocketmq/jms/util/MessageConverter.java     | 182 ++++++++
 .../rocketmq/jms/util/MsgConvertUtil.java       |  90 ++++
 .../apache/rocketmq/jms/util/URISpecParser.java |  61 +++
 .../core/src/main/resources/application.conf    |   1 +
 .../apache/rocketmq/jms/JmsTestListener.java    |  67 +++
 .../org/apache/rocketmq/jms/JmsTestUtil.java    |  54 +++
 .../jms/domain/message/JmsBytesMessageTest.java | 103 +++++
 .../domain/message/JmsMessageConvertTest.java   |  52 +++
 .../domain/message/JmsObjectMessageTest.java    |  92 ++++
 .../jms/domain/message/JmsTextMessageTest.java  |  50 +++
 .../jms/integration/IntegrationTestBase.java    | 199 +++++++++
 .../rocketmq/jms/integration/JmsClientIT.java   | 191 ++++++++
 .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ++++++
 .../rocketmq/jms/util/URISpecParserTest.java    |  43 ++
 rocketmq-jms/pom.xml                            | 196 +++++++++
 rocketmq-jms/spring/pom.xml                     |  82 ++++
 .../SimpleExMessageListenerContainer.java       |  90 ++++
 .../rocketmq/jms/spring/JmsConsumeIT.java       |  61 +++
 .../rocketmq/jms/spring/JmsProduceIT.java       |  93 ++++
 .../rocketmq/jms/spring/SpringTestBase.java     |  41 ++
 .../spring/src/test/resources/consumer.xml      |  51 +++
 .../spring/src/test/resources/producer.xml      |  43 ++
 rocketmq-jms/style/copyright/Apache.xml         |  24 +
 .../style/copyright/profiles_settings.xml       |  64 +++
 rocketmq-jms/style/rmq_checkstyle.xml           | 135 ++++++
 rocketmq-jms/style/rmq_codeStyle.xml            | 157 +++++++
 48 files changed, 5017 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07a5fa6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+# RocketMQ Externals
+
+There are some RocketMQ external projects, with the purpose of growing the RocketMQ community.
+
+## RocketMQ-Console-Ng
+A console for RocketMQ
+
+## RocketMQ-JMS
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+## RocketMQ-Flume-Ng
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+   `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+
+## RocketMQ-Spark
+
+Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided.
+For more details please refer to rocketmq-spark README.md.
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore
new file mode 100644
index 0000000..d2e5aaf
--- /dev/null
+++ b/rocketmq-jms/.gitignore
@@ -0,0 +1,5 @@
+.idea/
+*.iml
+*.ipr
+*.iws
+target/

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.travis.yml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml
new file mode 100644
index 0000000..9f430b2
--- /dev/null
+++ b/rocketmq-jms/.travis.yml
@@ -0,0 +1,43 @@
+notifications:
+  email:
+    recipients:
+      - zhangke.huangshan@gmail.com
+      - zhendongliu92@gmail.com
+  on_success: change
+  on_failure: always
+
+language: java
+
+matrix:
+  include:
+  # On OSX, run with default JDK only.
+  # - os: osx
+  # On Linux, run with specific JDKs only.
+  # - os: linux
+  #  env: CUSTOM_JDK="oraclejdk8"
+  - os: linux
+    env: CUSTOM_JDK="oraclejdk7"
+  #- os: linux
+  #  env: CUSTOM_JDK="openjdk7"
+
+before_install:
+  - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+  - cat ~/.mavenrc
+  - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+  - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+#os:
+#  - linux
+#  - osx
+#jdk:
+#  - oraclejdk8
+#  - oraclejdk7
+#  - openjdk7
+
+
+script:
+  - travis_retry mvn -B clean install jacoco:report coveralls:report
+
+#after_success:
+#  - mvn clean install
+#  - mvn sonar:sonar

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md
new file mode 100644
index 0000000..a05e27e
--- /dev/null
+++ b/rocketmq-jms/README.md
@@ -0,0 +1,31 @@
+# RocketMQ-JMS   [![Build Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms) [![Coverage Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
+
+
+## Introduction
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker.
+Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.   
+
+Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1".
+Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version. 
+
+
+## Building
+
+  > cd rocketmq-jms  
+  > mvn clean install  
+  
+  **run unit test:**  
+  > mvn test    
+  
+  **run integration test:**  
+  > mvn verify
+  
+  **see jacoco code coverage report**
+  > open core/target/site/jacoco/index.html  
+  > open core/target/site/jacoco-it/index.html  
+  > open spring/target/site/jacoco-it/index.html 
+  
+  
+## Guidelines
+
+ Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml
new file mode 100644
index 0000000..1b36e14
--- /dev/null
+++ b/rocketmq-jms/core/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-jms-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-jms</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
new file mode 100644
index 0000000..80a8b64
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface CommonConstant {
+
+    String PRODUCERID = "producerId";
+
+    String CONSUMERID = "consumerId";
+
+    String PROVIDER = "provider";
+
+    String NAMESERVER = "nameServer";
+
+    String INSTANCE_NAME = "instanceName";
+
+    String CONSUME_THREAD_NUMS = "consumeThreadNums";
+
+    String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
new file mode 100644
index 0000000..c8e4276
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class CommonContext {
+    private String accessKey;
+    private String secretKey;
+
+    private String consumerId;
+    private String producerId;
+
+    private String provider;
+
+    private String appId;
+
+    private String nameServer;
+
+    /**
+     * MQType
+     */
+    private String mqType;
+
+    /**
+     * Using for distinguishing client jvm process
+     */
+    private String instanceName;
+    /**
+     * Set consumer threadPool Size
+     */
+    private int consumeThreadNums;
+    /**
+     * Set send message timeOut
+     */
+    private int sendMsgTimeoutMillis = -1;
+
+    /**
+     * @return the appId
+     */
+    public String getAppId() {
+        return appId;
+    }
+
+    /**
+     * @param appId the appId to set
+     */
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    /**
+     * @return the provider
+     */
+    public String getProvider() {
+        return provider;
+    }
+
+    /**
+     * @param provider the provider to set
+     */
+    public void setProvider(String provider) {
+        this.provider = provider;
+    }
+
+    /**
+     * @return the instanceName
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @param instanceName the instanceName to set
+     */
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    /**
+     * @return the accessKey
+     */
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * @param accessKey the accessKey to set
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    /**
+     * @return the secretKey
+     */
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * @param secretKey the secretKey to set
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    /**
+     * @return consumer thread nums
+     */
+    public int getConsumeThreadNums() {
+        return consumeThreadNums;
+    }
+
+    /**
+     * @param consumeThreadNums
+     */
+    public void setConsumeThreadNums(int consumeThreadNums) {
+        this.consumeThreadNums = consumeThreadNums;
+    }
+
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(String consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getSendMsgTimeoutMillis() {
+        return sendMsgTimeoutMillis;
+    }
+
+    public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
+        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
+    }
+
+    public String getMqType() {
+        return mqType;
+    }
+
+    public void setMqType(String mqType) {
+        this.mqType = mqType;
+    }
+
+    public String getNameServer() {
+        return nameServer;
+    }
+
+    public void setNameServer(String nameServer) {
+        this.nameServer = nameServer;
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
new file mode 100644
index 0000000..4c809c7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+
+public class JmsBaseConnection implements Connection {
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    protected String clientID;
+    protected ExceptionListener exceptionListener;
+    protected CommonContext context;
+    protected JmsBaseSession session;
+
+    public JmsBaseConnection(Map<String, String> connectionParams) {
+
+        this.clientID = UUID.randomUUID().toString();
+
+        context = new CommonContext();
+
+        //At lease one should be set
+        context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
+        context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
+
+        //optional
+        context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
+
+        String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
+        String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
+        String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
+        String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME);
+
+        if (StringUtils.isNotEmpty(nameServer)) {
+            context.setNameServer(nameServer);
+        }
+        if (StringUtils.isNotEmpty(instanceName)) {
+            context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
+        }
+
+        if (StringUtils.isNotEmpty(consumerThreadNums)) {
+            context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
+        }
+        if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
+            context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
+        }
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+
+        Preconditions.checkArgument(!transacted, "Not support transaction Session !");
+        Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode,
+            "Not support this acknowledge mode: " + acknowledgeMode);
+
+        if (null != this.session) {
+            return this.session;
+        }
+        synchronized (this) {
+            if (null != this.session) {
+                return this.session;
+            }
+            this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context);
+            if (isStarted()) {
+                this.session.start();
+            }
+            return this.session;
+        }
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return this.clientID;
+    }
+
+    @Override
+    public void setClientID(String clientID) throws JMSException {
+        this.clientID = clientID;
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return new JmsBaseConnectionMetaData();
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return this.exceptionListener;
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) throws JMSException {
+        this.exceptionListener = listener;
+    }
+
+    @Override
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            if (this.session != null) {
+                this.session.start();
+            }
+
+        }
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        //Stop the connection before closing it.
+        //Do nothing here.
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (started.compareAndSet(true, false)) {
+            if (this.session != null) {
+                this.session.close();
+            }
+
+        }
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Whether the connection is started.
+     *
+     * @return whether the connection is started.
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
new file mode 100644
index 0000000..1b9da06
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.util.URISpecParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseConnectionFactory implements ConnectionFactory {
+
+    private static Logger logger = LoggerFactory
+        .getLogger(JmsBaseConnectionFactory.class);
+    /**
+     * Synchronization monitor for the shared Connection
+     */
+    private final Object connectionMonitor = new Object();
+    /**
+     * Can be configured in a consistent way without too much URL hacking.
+     */
+    protected URI connectionUri;
+    /**
+     * Store connection uri query parameters.
+     */
+    protected Map<String, String> connectionParams;
+    /**
+     * Wrapped Connection
+     */
+    protected JmsBaseConnection connection;
+
+    public JmsBaseConnectionFactory() {
+
+    }
+
+    public JmsBaseConnectionFactory(URI connectionUri) {
+        setConnectionUri(connectionUri);
+    }
+
+    public void setConnectionUri(URI connectionUri) {
+        Preconditions.checkNotNull(connectionUri, "Please set URI !");
+        this.connectionUri = connectionUri;
+        this.connectionParams = URISpecParser.parseURI(connectionUri.toString());
+
+        if (null != connectionParams) {
+            Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) ||
+                null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !");
+        }
+
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection == null) {
+                initConnection();
+            }
+            return this.connection;
+        }
+    }
+
+    /**
+     * Using userName and Password to create a connection
+     *
+     * @param userName ignored
+     * @param password ignored
+     * @return the new JMS Connection
+     * @throws JMSException
+     */
+    @Override
+    public Connection createConnection(String userName, String password) throws JMSException {
+        logger.debug("Using userName and Password to create a connection.");
+        return this.createConnection();
+    }
+
+    /**
+     * Initialize the underlying shared Connection.
+     * <p/>
+     * Closes and reInitializes the Connection if an underlying Connection is present already.
+     *
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected void initConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection != null) {
+                closeConnection(this.connection);
+            }
+            this.connection = doCreateConnection();
+            logger.debug("Established shared JMS Connection: {}", this.connection);
+        }
+    }
+
+    /**
+     * Close the given Connection.
+     *
+     * @param con the Connection to close
+     */
+    protected void closeConnection(Connection con) {
+        logger.debug("Closing shared JMS Connection: {}", this.connection);
+        try {
+            try {
+                con.stop();
+            }
+            finally {
+                con.close();
+            }
+        }
+        catch (Throwable ex) {
+            logger.error("Could not close shared JMS Connection.", ex);
+        }
+    }
+
+    /**
+     * Create a JMS Connection
+     *
+     * @return the new JMS Connection
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected JmsBaseConnection doCreateConnection() throws JMSException {
+        Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0,
+            "Connection Parameters can not be null!");
+        this.connection = new JmsBaseConnection(this.connectionParams);
+
+        return connection;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
new file mode 100644
index 0000000..ee549aa
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class JmsBaseConnectionMetaData implements ConnectionMetaData {
+    public static final String JMS_VERSION;
+    public static final int JMS_MAJOR_VERSION;
+    public static final int JMS_MINOR_VERSION;
+
+    public static final String PROVIDER_VERSION;
+    public static final int PROVIDER_MAJOR_VERSION;
+    public static final int PROVIDER_MINOR_VERSION;
+
+    public static final String PROVIDER_NAME = "Apache RocketMQ";
+
+    public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData();
+
+    public static InputStream resourceStream;
+
+    static {
+        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+        String jmsVersion = null;
+        int jmsMajor = 0;
+        int jmsMinor = 0;
+        try {
+            Package p = Package.getPackage("javax.jms");
+            if (p != null) {
+                jmsVersion = p.getImplementationVersion();
+                Matcher m = pattern.matcher(jmsVersion);
+                if (m.matches()) {
+                    jmsMajor = Integer.parseInt(m.group(1));
+                    jmsMinor = Integer.parseInt(m.group(2));
+                }
+            }
+        }
+        catch (Throwable e) {
+        }
+        JMS_VERSION = jmsVersion;
+        JMS_MAJOR_VERSION = jmsMajor;
+        JMS_MINOR_VERSION = jmsMinor;
+
+        String providerVersion = null;
+        int providerMajor = 0;
+        int providerMinor = 0;
+        Properties properties = new Properties();
+        try {
+            resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
+            properties.load(resourceStream);
+            providerVersion = properties.getProperty("version");
+
+            Matcher m = pattern.matcher(providerVersion);
+            if (m.matches()) {
+                providerMajor = Integer.parseInt(m.group(1));
+                providerMinor = Integer.parseInt(m.group(2));
+            }
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        PROVIDER_VERSION = providerVersion;
+        PROVIDER_MAJOR_VERSION = providerMajor;
+        PROVIDER_MINOR_VERSION = providerMinor;
+
+    }
+
+    public String getJMSVersion() throws JMSException {
+        return JMS_VERSION;
+    }
+
+    public int getJMSMajorVersion() throws JMSException {
+        return JMS_MAJOR_VERSION;
+    }
+
+    public int getJMSMinorVersion() throws JMSException {
+        return JMS_MINOR_VERSION;
+    }
+
+    public String getJMSProviderName() throws JMSException {
+        return PROVIDER_NAME;
+    }
+
+    public String getProviderVersion() throws JMSException {
+        return PROVIDER_VERSION;
+    }
+
+    public int getProviderMajorVersion() throws JMSException {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    public int getProviderMinorVersion() throws JMSException {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+        Vector<String> jmxProperties = new Vector<String>();
+        jmxProperties.add("jmsXUserId");
+        jmxProperties.add("jmsXAppId");
+        jmxProperties.add("jmsXGroupID");
+        jmxProperties.add("jmsXGroupSeq");
+        jmxProperties.add("jmsXState");
+        jmxProperties.add("jmsXDeliveryCount");
+        jmxProperties.add("jmsXProducerTXID");
+        jmxProperties.add("jmsConsumerTXID");
+        jmxProperties.add("jmsRecvTimeStamp");
+        return jmxProperties.elements();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
new file mode 100644
index 0000000..f0bca28
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface JmsBaseConstant {
+    //------------------------JMS message header constant---------------------------------
+    String JMS_DESTINATION = "jmsDestination";
+    String JMS_DELIVERY_MODE = "jmsDeliveryMode";
+    String JMS_EXPIRATION = "jmsExpiration";
+    String JMS_DELIVERY_TIME = "jmsDeliveryTime";
+    String JMS_PRIORITY = "jmsPriority";
+    String JMS_MESSAGE_ID = "jmsMessageID";
+    String JMS_TIMESTAMP = "jmsTimestamp";
+    String JMS_CORRELATION_ID = "jmsCorrelationID";
+    String JMS_REPLY_TO = "jmsReplyTo";
+    String JMS_TYPE = "jmsType";
+    String JMS_REDELIVERED = "jmsRedelivered";
+
+    //-------------------------JMS defined properties constant----------------------------
+    /**
+     * The identity of the user sending the Send message
+     */
+    String JMS_XUSER_ID = "jmsXUserID";
+    /**
+     * The identity of the application Send sending the message
+     */
+    String JMS_XAPP_ID = "jmsXAppID";
+    /**
+     * The number of message delivery Receive attempts
+     */
+    String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
+    /**
+     * The identity of the message group this message is part of
+     */
+    String JMS_XGROUP_ID = "jmsXGroupID";
+    /**
+     * The sequence number of this message within the group; the first message is 1, the second 2,...
+     */
+    String JMS_XGROUP_SEQ = "jmsXGroupSeq";
+    /**
+     * The transaction identifier of the Send transaction within which this message was produced
+     */
+    String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
+    /**
+     * The transaction identifier of the Receive transaction within which this message was consumed
+     */
+    String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
+
+    /**
+     * The time JMS delivered the Receive message to the consumer
+     */
+    String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
+    /**
+     * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and
+     * that these copies exist from the time the original message was sent. Each copy’s state is one of: 1(waiting),
+     * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided
+     * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this.
+     */
+    String JMS_XSTATE = "jmsXState";
+
+    //---------------------------JMS Headers' value constant---------------------------
+    /**
+     * Default time to live
+     */
+    long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
+
+    /**
+     * Default Jms Type
+     */
+    String DEFAULT_JMS_TYPE = "rocketmq";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
new file mode 100644
index 0000000..b62e928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessageConsumer implements MessageConsumer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    //all shared consumers
+    private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private CommonContext context;
+    private Destination destination;
+    private MessageListener messageListener;
+
+    public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
+        JmsBaseConnection connection) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, commonContext);
+
+            if (null == consumerMap.get(context.getConsumerId())) {
+                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
+                if (context.getConsumeThreadNums() > 0) {
+                    consumer.setConsumeThreadMax(context.getConsumeThreadNums());
+                    consumer.setConsumeThreadMin(context.getConsumeThreadNums());
+                }
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    consumer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    consumer.setInstanceName(context.getInstanceName());
+                }
+                consumer.setConsumeMessageBatchMaxSize(1);
+                //add subscribe?
+                RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
+                consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
+            }
+
+            consumerMap.get(context.getConsumerId()).incrementAndGet();
+
+            //If the connection has been started, start the consumer right now.
+            //add start status?
+            RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
+            if (connection.isStarted()) {
+                try {
+                    consumerExt.start();
+                }
+                catch (MQClientException mqe) {
+                    JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
+                    jmsException.initCause(mqe);
+                    throw jmsException;
+                }
+            }
+        }
+
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+        Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return this.messageListener;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                this.messageListener = listener;
+                String messageTopic = ((JmsBaseTopic) destination).getMessageTopic();
+                String messageType = ((JmsBaseTopic) destination).getMessageType();
+                rocketmqConsumerExt.subscribe(messageTopic, messageType, listener);
+            }
+            catch (MQClientException mqe) {
+                //add what?
+                throw new JMSException(mqe.getMessage());
+            }
+
+        }
+
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            if (closed.compareAndSet(false, true)) {
+                RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+                if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) {
+                    rocketmqConsumerExt.close();
+                    consumerMap.remove(context.getConsumerId());
+                }
+            }
+        }
+    }
+
+    /**
+     * Start the consumer to get message from the Broker.
+     */
+    public void startConsumer() throws JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                rocketmqConsumerExt.start();
+            }
+            catch (MQClientException mqe) {
+                throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed");
+            }
+        }
+    }
+
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
new file mode 100644
index 0000000..8dd82f0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseMessageProducer implements MessageProducer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap();
+    private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class);
+    private CommonContext context;
+
+    private Destination destination;
+
+    public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, context);
+
+            if (null == producerMap.get(this.context.getProducerId())) {
+                DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId());
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    producer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    producer.setInstanceName(context.getInstanceName());
+                }
+                if (context.getSendMsgTimeoutMillis() > 0) {
+                    producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
+                }
+                try {
+                    producer.start();
+                }
+                catch (MQClientException mqe) {
+                    throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId()));
+                }
+                producerMap.putIfAbsent(this.context.getProducerId(), producer);
+            }
+
+        }
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+        Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public boolean getDisableMessageID() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageID(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getDeliveryMode() throws JMSException {
+        return javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    }
+
+    @Override
+    public void setDeliveryMode(int deliveryMode) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getPriority() throws JMSException {
+        return javax.jms.Message.DEFAULT_PRIORITY;
+    }
+
+    @Override
+    public void setPriority(int defaultPriority) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public long getTimeToLive() throws JMSException {
+        return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
+    }
+
+    @Override
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        //Nothing to do
+    }
+
+    @Override
+    public void send(javax.jms.Message message) throws JMSException {
+        this.send(getDestination(), message);
+    }
+
+    /**
+     * Send the message to the defined Destination success---return normally. Exception---throw out JMSException.
+     *
+     * @param destination see <CODE>Destination</CODE>
+     * @param message the message to be sent.
+     * @throws javax.jms.JMSException
+     */
+    @Override
+    public void send(Destination destination, javax.jms.Message message) throws JMSException {
+        JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
+        initJMSHeaders(jmsMsg, destination);
+
+        try {
+            if (context == null) {
+                throw new IllegalStateException("Context should be inited");
+            }
+            org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg);
+
+            MQProducer producer = producerMap.get(context.getProducerId());
+
+            if (producer == null) {
+                throw new Exception("producer is null ");
+            }
+            SendResult sendResult = producer.send(rocketmqMsg);
+            if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId());
+            } else {
+                throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString()));
+            }
+        }
+        catch (Exception e) {
+            logger.error("Send rocketmq message failure !", e);
+            //if fail to send the message, throw out JMSException
+            JMSException jmsException = new JMSException("Send rocketmq message failure!");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+    }
+
+    @Override
+    public void send(javax.jms.Message message, int deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void send(Destination destination, javax.jms.Message message, int deliveryMode,
+        int priority, long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Init the JmsMessage Headers.
+     * <p/>
+     * <P>JMS providers init message's headers. Do not allow user to set these by yourself.
+     *
+     * @param jmsMsg message
+     * @param destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException {
+
+        //JMS_DESTINATION default:"topic:message"
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
+        //JMS_DELIVERY_MODE default : PERSISTENT
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE);
+        //JMS_TIMESTAMP default : current time
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis());
+        //JMS_EXPIRATION default :  3 days
+        //JMS_EXPIRATION = currentTime + time_to_live
+        jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
+        //JMS_PRIORITY default : 4
+        jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
+        //JMS_TYPE default : open notification service
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE);
+        //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
+        //JMS_MESSAGE_ID is set by sendResult.
+        //JMS_REDELIVERED is set by broker.
+    }
+
+    /**
+     * Init the OnsMessage Headers.
+     * <p/>
+     * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the
+     * OnsMessage's Properties.
+     *
+     * @param jmsMsg message
+     * @throws javax.jms.JMSException
+     */
+    public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
+        String topic, String messageType) throws JMSException {
+        Properties userProperties = new Properties();
+
+        //Jms userProperties to properties
+        Map<String, Object> userProps = jmsMsg.getProperties();
+        Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator();
+        while (userPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = userPropsIter.next();
+            userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+        }
+        //Jms systemProperties to ROCKETMQ properties
+        Map<String, Object> sysProps = jmsMsg.getHeaders();
+        Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator();
+        while (sysPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = sysPropsIter.next();
+            userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+        }
+
+        //Jms message Model
+        if (jmsMsg instanceof JmsBytesMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES);
+        }
+        else if (jmsMsg instanceof JmsObjectMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ);
+        }
+        else if (jmsMsg instanceof JmsTextMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+        }
+
+        //message topic and tag
+        userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
+        userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
+
+        return userProperties;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
new file mode 100644
index 0000000..5bf7005
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseSession implements Session {
+    protected CommonContext context;
+    protected JmsBaseConnection connection;
+    protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
+        new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
+    private boolean transacted = true;
+    private int acknowledgeMode = AUTO_ACKNOWLEDGE;
+
+    public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
+        int acknowledgeMode, CommonContext context) {
+        this.context = context;
+        this.acknowledgeMode = acknowledgeMode;
+        this.transacted = transacted;
+        this.connection = connection;
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return new JmsBytesMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return new JmsObjectMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+        return new JmsObjectMessage(object);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return new JmsTextMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return new JmsTextMessage(text);
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return this.transacted;
+    }
+
+    @Override
+    public int getAcknowledgeMode() {
+        return this.acknowledgeMode;
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.close();
+        }
+        consumerList.clear();
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public void run() {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        return new JmsBaseMessageProducer(destination, context);
+    }
+
+    /**
+     * Create a MessageConsumer.
+     * <p/>
+     * <P>Create a durable consumer to the specified destination
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        JmsBaseMessageConsumer messageConsumer = new
+            JmsBaseMessageConsumer(destination, this.context, this.connection);
+        this.consumerList.addIfAbsent(messageConsumer);
+        return messageConsumer;
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject
+     * messages from localhost.
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @param noLocal If true: reject messages from localhost
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        Preconditions.checkNotNull(topicName);
+        List<String> msgTuple = Arrays.asList(topicName.split(":"));
+
+        Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
+            "Destination must match messageTopic:messageType !");
+
+        //If messageType is null, use * instead.
+        if (1 == msgTuple.size()) {
+            return new JmsBaseTopic(msgTuple.get(0), "*");
+        }
+        return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name,
+        String messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        return new TemporaryQueue() {
+            public void delete() throws JMSException {
+            }
+
+            public String getQueueName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        return new TemporaryTopic() {
+            public void delete() throws JMSException {
+            }
+
+            public String getTopicName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    public void start() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.startConsumer();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
new file mode 100644
index 0000000..b7e2fab
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class JmsBaseTopic implements Topic {
+
+    private String messageTopic;
+    private String messageType;
+
+    public JmsBaseTopic(String messageTopic, String messageType) {
+        Preconditions.checkNotNull(messageTopic);
+        Preconditions.checkNotNull(messageType);
+
+        this.messageTopic = messageTopic;
+        this.messageType = messageType;
+    }
+
+    public String getTopicName() throws JMSException {
+        return this.toString();
+    }
+
+    public String toString() {
+        return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType());
+    }
+
+    public String getMessageTopic() {
+        return messageTopic;
+    }
+
+    public String getMessageType() {
+        return messageType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
new file mode 100644
index 0000000..7a8a9f7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.util.MessageConverter;
+
+public class RMQPushConsumerExt {
+    private final MQPushConsumer consumer;
+    private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>();
+
+    private AtomicInteger referenceCount = new AtomicInteger(0);
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    public RMQPushConsumerExt(MQPushConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public MQPushConsumer getConsumer() {
+        return consumer;
+    }
+
+    public int incrementAndGet() {
+        return referenceCount.incrementAndGet();
+    }
+
+    public int decrementAndGet() {
+        return referenceCount.decrementAndGet();
+    }
+
+    public int getReferenceCount() {
+        return referenceCount.get();
+    }
+    public void start() throws MQClientException {
+        if (consumer == null) {
+            throw new MQClientException(-1, "consumer is null");
+        }
+
+        if (this.started.compareAndSet(false, true)) {
+            this.consumer.registerMessageListener(new MessageListenerImpl());
+            this.consumer.start();
+        }
+    }
+
+
+    public void close() {
+        if (this.started.compareAndSet(true, false)) {
+            this.consumer.shutdown();
+        }
+    }
+
+    public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException {
+        if (null == topic) {
+            throw new MQClientException(-1, "topic is null");
+        }
+
+        if (null == listener) {
+            throw new MQClientException(-1, "listener is null");
+        }
+
+        try {
+            this.subscribeTable.put(topic, listener);
+            this.consumer.subscribe(topic, subExpression);
+        } catch (MQClientException e) {
+            throw new MQClientException("consumer subscribe exception", e);
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        if (null != topic) {
+            this.consumer.unsubscribe(topic);
+        }
+    }
+
+    class MessageListenerImpl implements MessageListenerConcurrently {
+
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
+            MessageExt msgRMQ = msgsRMQList.get(0);
+            javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
+            if (null == listener) {
+                throw new RuntimeException("MessageListener is null");
+            }
+
+            try {
+                listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
+            }
+            catch (Exception e) {
+                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+
+    public boolean isStarted() {
+        return started.get();
+    }
+
+
+    public boolean isClosed() {
+        return !isStarted();
+    }
+}