You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:41:42 UTC

[2/3] incubator-rocketmq-externals git commit: Release rocketmq-jms 1.0.0 version

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
new file mode 100644
index 0000000..ea4b49e
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessage implements Message {
+    /**
+     * Message properties
+     */
+    protected Map<String, Object> properties = Maps.newHashMap();
+    /**
+     * Message headers
+     */
+    protected Map<String, Object> headers = Maps.newHashMap();
+    /**
+     * Message body
+     */
+    protected Serializable body;
+
+    @Override
+    public String getJMSMessageID() {
+        return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
+    }
+
+    /**
+     * Sets the message ID.
+     * <p/>
+     * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself.
+     *
+     * @param id the ID of the message
+     * @see javax.jms.Message#getJMSMessageID()
+     */
+
+    @Override
+    public void setJMSMessageID(String id) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public long getJMSTimestamp() {
+        if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
+            return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
+        }
+        return 0;
+    }
+
+    @Override
+    public void setJMSTimestamp(long timestamp) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() {
+        String jmsCorrelationID = getJMSCorrelationID();
+        if (jmsCorrelationID != null) {
+            try {
+                return BaseEncoding.base64().decode(jmsCorrelationID);
+            }
+            catch (Exception e) {
+                return jmsCorrelationID.getBytes();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
+        String encodedText = BaseEncoding.base64().encode(correlationID);
+        setJMSCorrelationID(encodedText);
+    }
+
+    @Override
+    public String getJMSCorrelationID() {
+        if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
+            return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSCorrelationID(String correlationID) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public Destination getJMSReplyTo() {
+        if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
+            return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSReplyTo(Destination replyTo) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
+
+    @Override
+    public Destination getJMSDestination() {
+        if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
+            return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSDestination(Destination destination) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T getBody(Class<T> clazz) throws JMSException {
+        if (clazz.isInstance(body)) {
+            return (T) body;
+        }
+        else {
+            throw new IllegalArgumentException("The class " + clazz
+                + " is unknown to this implementation");
+        }
+    }
+
+    @Override
+    public int getJMSDeliveryMode() {
+        if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
+            return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
+        }
+        return 0;
+    }
+
+    /**
+     * Sets the <CODE>DeliveryMode</CODE> value for this message.
+     * <p/>
+     * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not
+     * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method.
+     *
+     * @param deliveryMode the delivery mode for this message
+     * @see javax.jms.Message#getJMSDeliveryMode()
+     * @see javax.jms.DeliveryMode
+     */
+
+    @Override
+    public void setJMSDeliveryMode(int deliveryMode) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
+        return clazz.isInstance(body);
+    }
+
+    @Override
+    public boolean getJMSRedelivered() {
+        return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
+            && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
+    }
+
+    @Override
+    public void setJMSRedelivered(boolean redelivered) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    /**
+     * copy meta data from source message
+     *
+     * @param sourceMessage source message
+     */
+    public void copyMetaData(JmsBaseMessage sourceMessage) {
+        if (!sourceMessage.getHeaders().isEmpty()) {
+            for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) {
+                if (!headerExits(entry.getKey())) {
+                    setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        if (!sourceMessage.getProperties().isEmpty()) {
+            for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) {
+                if (!propertyExists(entry.getKey())) {
+                    setObjectProperty(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+    }
+
+    @Override
+    public String getJMSType() {
+        return (String) headers.get(JmsBaseConstant.JMS_TYPE);
+    }
+
+    @Override
+    public void setJMSType(String type) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public Map<String, Object> getHeaders() {
+        return this.headers;
+    }
+
+    @Override
+    public long getJMSExpiration() {
+        if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
+            return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
+        }
+        return 0;
+    }
+
+    @Override
+    public void setJMSExpiration(long expiration) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public boolean headerExits(String name) {
+        return this.headers.containsKey(name);
+    }
+
+    @Override
+    public int getJMSPriority() {
+        if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
+            return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
+        }
+        return 5;
+    }
+
+    @Override
+    public void setJMSPriority(int priority) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void setHeader(String name, Object value) {
+        this.headers.put(name, value);
+    }
+
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void acknowledge() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void clearProperties() {
+        this.properties.clear();
+    }
+
+    @Override
+    public void clearBody() {
+        this.body = null;
+    }
+
+    @Override
+    public boolean propertyExists(String name) {
+        return properties.containsKey(name);
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString());
+        }
+        return false;
+    }
+
+    @Override
+    public byte getByteProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public short getShortProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Short ? (Short) value : Short.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public int getIntProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public long getLongProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Long ? (Long) value : Long.valueOf(value.toString());
+        }
+        return 0L;
+    }
+
+    @Override
+    public float getFloatProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Float ? (Float) value : Float.valueOf(value.toString());
+        }
+        return 0f;
+    }
+
+    @Override
+    public double getDoubleProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Double ? (Double) value : Double.valueOf(value.toString());
+        }
+        return 0d;
+    }
+
+    @Override
+    public String getStringProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            return getObjectProperty(name).toString();
+        }
+        return null;
+    }
+
+    @Override
+    public Object getObjectProperty(String name) throws JMSException {
+        return this.properties.get(name);
+    }
+
+    @Override
+    public Enumeration<?> getPropertyNames() throws JMSException {
+        final Object[] keys = this.properties.keySet().toArray();
+        return new Enumeration<Object>() {
+            int i;
+
+            @Override
+            public boolean hasMoreElements() {
+                return i < keys.length;
+            }
+
+            @Override
+            public Object nextElement() {
+                return keys[i++];
+            }
+        };
+    }
+
+    @Override
+    public void setBooleanProperty(String name, boolean value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setByteProperty(String name, byte value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setShortProperty(String name, short value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setIntProperty(String name, int value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setLongProperty(String name, long value) {
+        setObjectProperty(name, value);
+    }
+
+    public void setFloatProperty(String name, float value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setDoubleProperty(String name, double value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setStringProperty(String name, String value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setObjectProperty(String name, Object value) {
+        if (value instanceof Number || value instanceof String || value instanceof Boolean) {
+            this.properties.put(name, value);
+        }
+        else {
+            throw new IllegalArgumentException(
+                "Value should be boolean, byte, short, int, long, float, double, and String.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
new file mode 100644
index 0000000..b1e85b0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+/**
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte
+ * messages, they are typically not used, since the inclusion of properties may affect the format. <P>
+ */
+public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage {
+    private DataInputStream dataAsInput;
+    private DataOutputStream dataAsOutput;
+    private ByteArrayOutputStream bytesOut;
+    private byte[] bytesIn;
+
+    /**
+     * Message created for reading
+     *
+     * @param data
+     */
+    public JmsBytesMessage(byte[] data) {
+        this.bytesIn = data;
+        dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length));
+    }
+
+    /**
+     * Message created to be sent
+     */
+    public JmsBytesMessage() {
+        bytesOut = new ByteArrayOutputStream();
+        dataAsOutput = new DataOutputStream(bytesOut);
+    }
+
+    public long getBodyLength() throws JMSException {
+        return getData().length;
+    }
+
+    /**
+     * @return the data
+     */
+    public byte[] getData() {
+        if (bytesOut != null) {
+            return bytesOut.toByteArray();
+        }
+        else {
+            return bytesIn;
+        }
+
+    }
+
+    public boolean readBoolean() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public byte readByte() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readUnsignedByte() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public short readShort() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readUnsignedShort() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public char readChar() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readInt() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public long readLong() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public float readFloat() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public double readDouble() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public String readUTF() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readBytes(byte[] value) throws JMSException {
+        return readBytes(value, value.length);
+    }
+
+    public int readBytes(byte[] value, int length) throws JMSException {
+        if (length > value.length) {
+            throw new IndexOutOfBoundsException("length must be smaller than the length of value");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("Message is not readable! ");
+        }
+        try {
+            int offset = 0;
+            while (offset < length) {
+                int read = dataAsInput.read(value, offset, length - offset);
+                if (read < 0) {
+                    break;
+                }
+                offset += read;
+            }
+
+            if (offset == 0 && length != 0) {
+                return -1;
+            }
+            else {
+                return offset;
+            }
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+
+    }
+
+    public void writeBoolean(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeByte(byte value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeShort(short value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeChar(char value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeInt(int value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeLong(long value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeFloat(float value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeDouble(double value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeUTF(String value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeBytes(byte[] value) throws JMSException {
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! ");
+        }
+        try {
+            dataAsOutput.write(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! ");
+        }
+        try {
+            dataAsOutput.write(value, offset, length);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeObject(Object value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void reset() throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    private JMSException handleOutputException(final IOException e) {
+        JMSException ex = new JMSException(e.getMessage());
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    private JMSException handleInputException(final IOException e) {
+        JMSException ex;
+        if (e instanceof EOFException) {
+            ex = new MessageEOFException(e.getMessage());
+        }
+        else {
+            ex = new MessageFormatException(e.getMessage());
+        }
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
new file mode 100644
index 0000000..f67da14
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage {
+
+    public JmsObjectMessage(Serializable object) {
+        this.body = object;
+    }
+
+    public JmsObjectMessage() {
+
+    }
+
+    public Serializable getObject() throws JMSException {
+        return this.body;
+    }
+
+    public void setObject(Serializable object) throws JMSException {
+        this.body = object;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
new file mode 100644
index 0000000..ce19b51
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+public class JmsTextMessage extends JmsBaseMessage implements TextMessage {
+    private String text;
+
+    public JmsTextMessage() {
+
+    }
+
+    public JmsTextMessage(String text) {
+        setText(text);
+    }
+
+    public void clearBody() {
+        this.text = null;
+        super.clearBody();
+    }
+
+    public String getText() throws JMSException {
+        return this.text;
+    }
+
+    public void setText(String text) {
+        this.body = text;
+        this.text = text;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
new file mode 100644
index 0000000..bd926e5
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+
+public class ExceptionUtil {
+    public static final boolean SKIP_SET_EXCEPTION
+        = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false"));
+
+    public static void handleUnSupportedException() {
+        if (!ExceptionUtil.SKIP_SET_EXCEPTION) {
+            throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," +
+                " use '-Dskip.set.exception=true' in JVM options.");
+        }
+    }
+
+    public static JMSException convertToJmsException(Exception e, String extra) {
+        Preconditions.checkNotNull(extra);
+        Preconditions.checkNotNull(e);
+        JMSException jmsException = new JMSException(extra);
+        jmsException.initCause(e);
+        return jmsException;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
new file mode 100644
index 0000000..3cf03f9
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.jms.BytesMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+
+import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders;
+
+public class MessageConverter {
+    public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception {
+        byte[] content;
+        if (jmsMessage instanceof TextMessage) {
+            if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(),
+                Charsets.UTF_8.toString());
+        }
+        else if (jmsMessage instanceof ObjectMessage) {
+            if (((ObjectMessage) jmsMessage).getObject() == null) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject());
+        }
+        else if (jmsMessage instanceof BytesMessage) {
+            JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage;
+            if (bytesMessage.getBodyLength() == 0) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = bytesMessage.getData();
+        }
+        else {
+            throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType());
+        }
+
+        return content;
+    }
+
+    public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception {
+        JmsBaseMessage message;
+        if (MsgConvertUtil.MSGMODEL_BYTES.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsBytesMessage(msg.getBody());
+        }
+        else if (MsgConvertUtil.MSGMODEL_OBJ.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody()));
+        }
+        else if (MsgConvertUtil.MSGMODEL_TEXT.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(),
+                Charsets.UTF_8.toString()));
+        }
+        else {
+            // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL.
+            message = new JmsBytesMessage(msg.getBody());
+        }
+
+        //-------------------------set headers-------------------------
+        Map<String, Object> properties = new HashMap<String, Object>();
+
+        message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId());
+
+        if (msg.getReconsumeTimes() > 0) {
+            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE);
+        }
+        else {
+            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+        }
+
+        Map<String, String> propertiesMap = msg.getProperties();
+        if (propertiesMap != null) {
+            for (String properName : propertiesMap.keySet()) {
+                String properValue = propertiesMap.get(properName);
+                if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) {
+                    String destinationStr = properValue;
+                    if (null != destinationStr) {
+                        List<String> msgTuple = Arrays.asList(destinationStr.split(":"));
+                        message.setHeader(JmsBaseConstant.JMS_DESTINATION,
+                            new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)));
+                    }
+                }
+                else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) ||
+                    JmsBaseConstant.JMS_PRIORITY.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) ||
+                    JmsBaseConstant.JMS_EXPIRATION.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) ||
+                    JmsBaseConstant.JMS_TYPE.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) ||
+                    JmsBaseConstant.JMS_REDELIVERED.equals(properName)) {
+                    //JMS_MESSAGE_ID should set by msg.getMsgID()
+                    continue;
+                }
+                else {
+                    properties.put(properName, properValue);
+                }
+            }
+        }
+
+        //Handle System properties, put into header.
+        //add what?
+        message.setProperties(properties);
+
+        return message;
+    }
+
+    public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception {
+        Message rocketmqMsg = new MessageExt();
+        // 1. Transform message body
+        rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg));
+
+        // 2. Transform topic and messageType
+        JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION);
+        String topic = destination.getMessageTopic();
+        rocketmqMsg.setTopic(topic);
+        String messageType = destination.getMessageType();
+        Preconditions.checkState(!messageType.contains("||"),
+            "'||' can not be in the destination when sending a message");
+        rocketmqMsg.setTags(messageType);
+
+        // 3. Transform message properties
+        Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType);
+        for (String name : properties.stringPropertyNames()) {
+            String value = properties.getProperty(name);
+            if (MessageConst.PROPERTY_KEYS.equals(name)) {
+                rocketmqMsg.setKeys(value);
+            } else if (MessageConst.PROPERTY_TAGS.equals(name)) {
+                rocketmqMsg.setTags(value);
+            } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) {
+                rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value));
+            } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) {
+                rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value));
+            } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) {
+                rocketmqMsg.setBuyerId(value);
+            } else {
+                rocketmqMsg.putUserProperty(name, value);
+            }
+        }
+
+        return rocketmqMsg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
new file mode 100644
index 0000000..ec55bbc
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class MsgConvertUtil {
+
+    public static final byte[] EMPTY_BYTES = new byte[0];
+    public static final String EMPTY_STRING = "";
+
+    public static final String JMS_MSGMODEL = "jmsMsgModel";
+    /**
+     * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message
+     * model, value can be textMessage OR objectMessage
+     */
+    public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel";
+
+    public static final String MSGMODEL_TEXT = "textMessage";
+    public static final String MSGMODEL_BYTES = "bytesMessage";
+    public static final String MSGMODEL_OBJ = "objectMessage";
+
+    public static final String MSG_TOPIC = "msgTopic";
+    public static final String MSG_TYPE = "msgType";
+
+    public static byte[] objectSerialize(Object object) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(object);
+        oos.close();
+        baos.close();
+        return baos.toByteArray();
+    }
+
+    public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        ois.close();
+        bais.close();
+        return (Serializable) ois.readObject();
+    }
+
+    public static final byte[] string2Bytes(String s, String charset) {
+        if (null == s) {
+            return EMPTY_BYTES;
+        }
+        byte[] bs = null;
+        try {
+            bs = s.getBytes(charset);
+        }
+        catch (Exception e) {
+            // ignore
+        }
+        return bs;
+    }
+
+    public static final String bytes2String(byte[] bs, String charset) {
+        if (null == bs) {
+            return EMPTY_STRING;
+        }
+        String s = null;
+        try {
+            s = new String(bs, charset);
+        }
+        catch (Exception e) {
+            // ignore
+        }
+        return s;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
new file mode 100644
index 0000000..9b29928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+
+public abstract class URISpecParser {
+
+    private static final String DEFAULT_BROKER = "rocketmq";
+
+    /**
+     * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2
+     *
+     * @param uri Just like broker://ip:port?key1=value1&key2=value2
+     * @return The parameters' map
+     */
+    public static Map<String, String> parseURI(String uri) {
+        Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!");
+
+        Map<String, String> results = Maps.newHashMap();
+        String broker = uri.substring(0, uri.indexOf(":"));
+        results.put(CommonConstant.PROVIDER, broker);
+
+        if (broker.equals(DEFAULT_BROKER)) {
+            //Special handle for alibaba inner mq broker
+            String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length());
+            if (StringUtils.isNotEmpty(queryStr)) {
+                String[] params = queryStr.split("&");
+                for (String param : params) {
+                    if (param.contains("=")) {
+                        String[] values = param.split("=", 2);
+                        results.put(values[0], values[1]);
+                    }
+                }
+            }
+        }
+        else {
+            throw new IllegalArgumentException("Broker must be rocketmq");
+        }
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/resources/application.conf b/rocketmq-jms/core/src/main/resources/application.conf
new file mode 100644
index 0000000..713c915
--- /dev/null
+++ b/rocketmq-jms/core/src/main/resources/application.conf
@@ -0,0 +1 @@
+version = ${project.version}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
new file mode 100644
index 0000000..d77b13e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import org.junit.Assert;
+
+public class JmsTestListener implements MessageListener {
+
+    private int expectd;
+    private CountDownLatch latch;
+    private AtomicInteger consumedNum = new AtomicInteger(0);
+
+    public JmsTestListener() {
+        this.expectd = 10;
+    }
+    public JmsTestListener(int expectd) {
+        this.expectd = expectd;
+    }
+    public JmsTestListener(int expected, CountDownLatch latch) {
+        this.expectd = expected;
+        this.latch = latch;
+    }
+    @Override
+    public void onMessage(Message message) {
+        try {
+            Assert.assertNotNull(message);
+            Assert.assertNotNull(message.getJMSMessageID());
+            if (consumedNum.incrementAndGet() == expectd && latch != null) {
+                latch.countDown();
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public int getConsumedNum() {
+        return consumedNum.get();
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public void setExpectd(int expectd) {
+        this.expectd = expectd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
new file mode 100644
index 0000000..855cb19
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+
+public class JmsTestUtil {
+    public static MQProducer getMQProducer(String producerId) throws Exception {
+        Assert.assertNotNull(producerId);
+        Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap");
+        field.setAccessible(true);
+        ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null);
+        return  producerMap.get(producerId);
+    }
+    public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception {
+        Assert.assertNotNull(consumerId);
+        Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap");
+        field.setAccessible(true);
+        ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null);
+        return  consumerMap.get(consumerId);
+    }
+    public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception {
+        RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+        if (isNull) {
+            Assert.assertNull(rmqPushConsumerExt);
+        } else  {
+            Assert.assertNotNull(rmqPushConsumerExt);
+            Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted());
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
new file mode 100644
index 0000000..9fe9f5e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsBytesMessageTest {
+
+    private byte[] receiveData = "receive data test".getBytes();
+    private byte[] sendData = "send data test".getBytes();
+
+    @Test
+    public void testGetData() throws Exception {
+        JmsBytesMessage readMessage = new JmsBytesMessage(receiveData);
+
+        System.out.println(new String(readMessage.getData()));
+        Assert.assertEquals(new String(receiveData), new String(readMessage.getData()));
+
+        JmsBytesMessage sendMessage = new JmsBytesMessage();
+        sendMessage.writeBytes(sendData, 0, sendData.length);
+
+        System.out.println(new String(sendMessage.getData()));
+        Assert.assertEquals(new String(sendData), new String(sendMessage.getData()));
+
+    }
+
+    @Test
+    public void testGetBodyLength() throws Exception {
+
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        System.out.println(bytesMessage.getBodyLength());
+        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+    }
+
+    @Test
+    public void testReadBytes() throws Exception {
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+        byte[] receiveValue = new byte[receiveData.length];
+        bytesMessage.readBytes(receiveValue);
+
+        System.out.println(new String(receiveValue));
+        Assert.assertEquals(new String(receiveValue), new String(receiveData));
+
+    }
+
+    @Test
+    public void testReadBytes1() throws Exception {
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        byte[] receiveValue1 = new byte[2];
+        bytesMessage.readBytes(receiveValue1, 2);
+        System.out.println(new String(receiveValue1));
+        Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1));
+
+        byte[] receiceValue2 = new byte[2];
+        bytesMessage.readBytes(receiceValue2, 2);
+        System.out.println(new String(receiceValue2));
+        Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2));
+
+    }
+
+    @Test
+    public void testWriteBytes() throws Exception {
+        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+        jmsBytesMessage.writeBytes(sendData);
+
+        System.out.println(new String(jmsBytesMessage.getData()));
+        Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData));
+
+    }
+
+    @Test
+    public void testException() throws Exception {
+        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+
+        byte[] receiveValue = new byte[receiveData.length];
+//        Throws out NullPointerException
+//        jmsBytesMessage.readBytes(receiveValue);
+
+        JmsBytesMessage sendMessage = new JmsBytesMessage(sendData);
+//        Throws out NullPointerException
+//        sendMessage.writeBytes("hello again".getBytes());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
new file mode 100644
index 0000000..b570142
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.jms.domain.message;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsMessageConvertTest {
+    @Test
+    public void testCovert2RMQ() throws Exception {
+        //init jmsBaseMessage
+        String topic = "TestTopic";
+        String messageType = "TagA";
+
+        JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText");
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType));
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null");
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic);
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType);
+        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType);
+        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType);
+
+        //convert to RMQMessage
+        MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage);
+
+        System.out.println(message);
+
+        //then convert back to jmsBaseMessage
+        JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message);
+
+        JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage;
+        JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack;
+
+        Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText());
+        Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString());
+        Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID());
+        Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered());
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
new file mode 100644
index 0000000..6951976
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsObjectMessageTest {
+
+    @Test
+    public void testGetObject() {
+        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+        try {
+            Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20));
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testGetBody() {
+        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+
+        try {
+            User user = (User)jmsObjectMessage.getBody(Object.class);
+            System.out.println(user.getName() + ": " + user.getAge());
+            Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject());
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private class User implements Serializable {
+        private String name;
+        private int age;
+
+        private User(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            User user = (User)obj;
+            if (age != user.getAge())
+                return false;
+            if (name != null ? !name.equals(user.getName()) : user.getName() != null)
+                return false;
+            return true;
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        public void setAge(int age) {
+            this.age = age;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
new file mode 100644
index 0000000..d3c8287
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsTextMessageTest {
+    private String text = "jmsTextMessage test";
+
+    @Test
+    public void testGetBody() {
+        JmsTextMessage jmsTextMessage = new JmsTextMessage(text);
+        try {
+            Assert.assertEquals(jmsTextMessage.getBody(String.class), text);
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testSetGetText() {
+        JmsTextMessage jmsTextMessage = new JmsTextMessage();
+        jmsTextMessage.setText(text);
+        try {
+            Assert.assertEquals(jmsTextMessage.getText(), text);
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
new file mode 100644
index 0000000..02fe111
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestBase {
+    public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+
+    protected static Random random = new Random();
+    protected static final String SEP = File.separator;
+
+
+    protected static String topic = "jms-test";
+    protected static String topic2 = "jms-test-2";
+    protected static String messageType = "TagA";
+    protected static String producerId = "PID-jms-test";
+    protected static String consumerId = "CID-jms-test";
+    protected static String consumerId2 = "CID-jms-test-2";
+    protected static String nameServer;
+    protected static String text = "English test";
+    protected static int consumeThreadNums = 16;
+
+
+
+
+    protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+    protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+    protected static final List<File> TMPE_FILES = new ArrayList<File>();
+    protected static final List<BrokerController> BROKER_CONTROLLERS =  new ArrayList<BrokerController>();
+    protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>();
+
+
+    private static String createBaseDir() {
+        String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
+        final File file = new File(baseDir);
+        if (file.exists()) {
+            System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir));
+            System.exit(1);
+        }
+        TMPE_FILES.add(file);
+        return baseDir;
+    }
+
+    public static NamesrvController createAndStartNamesrv() {
+        String baseDir = createBaseDir();
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+        namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
+
+        nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+        try {
+            Assert.assertTrue(namesrvController.initialize());
+            logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+            namesrvController.start();
+        } catch (Exception e) {
+            System.out.println("Name Server start failed");
+            System.exit(1);
+        }
+        NAMESRV_CONTROLLERS.add(namesrvController);
+        return namesrvController;
+
+    }
+
+
+    public static BrokerController createAndStartBroker(String nsAddr) {
+        String baseDir = createBaseDir();
+        BrokerConfig brokerConfig = new BrokerConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        brokerConfig.setNamesrvAddr(nsAddr);
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
+        storeConfig.setHaListenPort(8000 + random.nextInt(1000));
+        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+        BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
+        try {
+            Assert.assertTrue(brokerController.initialize());
+            logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+            brokerController.start();
+        } catch (Exception e) {
+            System.out.println("Broker start failed");
+            System.exit(1);
+        }
+        BROKER_CONTROLLERS.add(brokerController);
+        return brokerController;
+    }
+
+
+
+    protected static DefaultMQAdminExt defaultMQAdminExt;
+
+    static {
+        //clear the environment
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override public void run() {
+                if (defaultMQAdminExt != null) {
+                    defaultMQAdminExt.shutdown();
+                }
+                for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) {
+                    if (namesrvController != null) {
+                        namesrvController.shutdown();
+                    }
+                }
+                for (BrokerController brokerController: BROKER_CONTROLLERS) {
+                    if (brokerController != null) {
+                        brokerController.shutdown();
+                    }
+                }
+                for (File file : TMPE_FILES) {
+                    deleteFile(file);
+                }
+            }
+        });
+
+
+        NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv();
+        nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+        BrokerController brokerController = createAndStartBroker(nameServer);
+
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExt.setNamesrvAddr(nameServer);
+        try {
+            defaultMQAdminExt.start();
+        } catch (MQClientException e) {
+            System.out.println("DefaultMQAdminExt start failed");
+            System.exit(1);
+        }
+
+        createTopic(topic, brokerController.getBrokerAddr());
+
+
+    }
+
+    public static void deleteFile(File file) {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isFile()) {
+            file.delete();
+        } else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (int i = 0;i < files.length;i ++) {
+                deleteFile(files[i]);
+            }
+            file.delete();
+        }
+    }
+    public static void createTopic(String topic, String addr) {
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setReadQueueNums(4);
+        topicConfig.setWriteQueueNums(4);
+        try {
+            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+        } catch (Exception e) {
+            logger.error("Create topic:{} addr:{} failed", addr, topic);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
new file mode 100644
index 0000000..367700a
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.JmsTestUtil;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsClientIT extends IntegrationTestBase {
+
+    @Test
+    public void testConfigInURI() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+            CommonConstant.PRODUCERID, producerId,
+            CommonConstant.CONSUMERID, consumerId,
+            CommonConstant.NAMESERVER, nameServer,
+            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            session.createConsumer(destination);
+            session.createProducer(destination);
+
+            DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer();
+            Assert.assertNotNull(rmqPushConsumer);
+            Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup());
+            Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName());
+            Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax());
+            Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin());
+            Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr());
+
+            DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId);
+            Assert.assertNotNull(mqProducer);
+            Assert.assertEquals(producerId, mqProducer.getProducerGroup());
+            Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName());
+            Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout());
+            Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr());
+
+            Thread.sleep(2000);
+        }
+        finally {
+            connection.close();
+        }
+
+    }
+
+
+    private Connection createConnection(String producerGroup, String consumerGroup) throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+            CommonConstant.PRODUCERID, producerGroup,
+            CommonConstant.CONSUMERID, consumerGroup,
+            CommonConstant.NAMESERVER, nameServer,
+            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+        return  connectionFactory.createConnection();
+    }
+
+    @Test
+    public void testProducerAndConsume_TwoConsumer() throws Exception {
+
+        Connection connection = createConnection(producerId, consumerId);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destinationA = session.createTopic("TopicA");
+        Destination destinationB = session.createTopic("TopicB");
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        JmsTestListener listenerA = new JmsTestListener(10,countDownLatch);
+        JmsTestListener listenerB = new JmsTestListener(10, countDownLatch);
+
+        try {
+            //two consumers
+            MessageConsumer messageConsumerA = session.createConsumer(destinationA);
+            messageConsumerA.setMessageListener(listenerA);
+            MessageConsumer messageConsumerB = session.createConsumer(destinationB);
+            messageConsumerB.setMessageListener(listenerB);
+            //producer
+            MessageProducer messageProducer = session.createProducer(destinationA);
+            connection.start();
+
+            for (int i = 0; i < 10; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            for (int i = 0; i < 10; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(destinationB, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+
+            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+                Thread.sleep(2000);
+            }
+            Assert.assertEquals(10, listenerA.getConsumedNum());
+            Assert.assertEquals(10, listenerB.getConsumedNum());
+        }
+        finally {
+            //Close the connection
+            connection.close();
+        }
+
+    }
+
+    @Test
+    public void testProducerAndConsume_TagFilter() throws Exception {
+        Connection connection = createConnection(producerId, consumerId);
+        Connection anotherConnection = createConnection(producerId, consumerId +"other");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination destinationA = session.createTopic("topic:tagA");
+        Destination destinationB = session.createTopic("topic:tagB");
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        JmsTestListener listenerForTagA =  new JmsTestListener(10, countDownLatch);
+        JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch);
+        try {
+            session.createConsumer(destinationA).setMessageListener(listenerForTagA);
+            anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll);
+            //producer
+            MessageProducer messageProducer = session.createProducer(destinationA);
+            connection.start();
+            anotherConnection.start();
+
+            for (int i = 0; i < 20; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            for (int i = 0; i < 20; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(destinationB, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+
+            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+                Thread.sleep(2000);
+            }
+            Assert.assertEquals(20, listenerForTagA.getConsumedNum());
+            Assert.assertEquals(40, listenerForAll.getConsumedNum());
+        }
+        finally {
+            //Close the connection
+            connection.close();
+            anotherConnection.close();
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
new file mode 100644
index 0000000..6cbb7b1
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState;
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsConsumerIT extends IntegrationTestBase {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+
+    private  MessageListener listener = new MessageListener() {
+        @Override
+        public void onMessage(Message message) {
+            try {
+                Assert.assertNotNull(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+
+
+    @Test
+    public void testStartIdempotency() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        checkConsumerState(consumerId, true, false);
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(listener);
+
+            checkConsumerState(consumerId, false, false);
+
+            ((JmsBaseMessageConsumer) consumer).startConsumer();
+            checkConsumerState(consumerId, false, true);
+
+            Destination destination1 = session.createTopic(topic2 + ":" + messageType);
+            MessageConsumer consumer1 = session.createConsumer(destination1);
+            consumer1.setMessageListener(listener);
+
+            ((JmsBaseMessageConsumer) consumer1).startConsumer();
+            checkConsumerState(consumerId, false, true);
+
+            //the start is idempotent
+            connection.start();
+            connection.start();
+
+            Thread.sleep(5000);
+        }
+        finally {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testReferenceCount() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(listener);
+
+            RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+            Assert.assertNotNull(rmqPushConsumerExt);
+            Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount());
+
+
+            MessageConsumer consumer2 = session.createConsumer(destination);
+            Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount());
+
+            MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType));
+
+            Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount());
+
+            session.close();
+
+            Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount());
+            Assert.assertEquals(false, rmqPushConsumerExt.isStarted());
+            Assert.assertNull(getRMQPushConsumerExt(consumerId));
+
+            Thread.sleep(5000);
+        }
+        finally {
+            connection.close();
+        }
+    }
+
+}