You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/06/15 02:41:42 UTC
[2/3] incubator-rocketmq-externals git commit: Release rocketmq-jms
1.0.0 version
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
new file mode 100644
index 0000000..ea4b49e
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessage implements Message {
+ /**
+ * Message properties
+ */
+ protected Map<String, Object> properties = Maps.newHashMap();
+ /**
+ * Message headers
+ */
+ protected Map<String, Object> headers = Maps.newHashMap();
+ /**
+ * Message body
+ */
+ protected Serializable body;
+
+ @Override
+ public String getJMSMessageID() {
+ return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
+ }
+
+ /**
+ * Sets the message ID.
+ * <p/>
+ * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself.
+ *
+ * @param id the ID of the message
+ * @see javax.jms.Message#getJMSMessageID()
+ */
+
+ @Override
+ public void setJMSMessageID(String id) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public long getJMSTimestamp() {
+ if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
+ return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
+ }
+ return 0;
+ }
+
+ @Override
+ public void setJMSTimestamp(long timestamp) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() {
+ String jmsCorrelationID = getJMSCorrelationID();
+ if (jmsCorrelationID != null) {
+ try {
+ return BaseEncoding.base64().decode(jmsCorrelationID);
+ }
+ catch (Exception e) {
+ return jmsCorrelationID.getBytes();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
+ String encodedText = BaseEncoding.base64().encode(correlationID);
+ setJMSCorrelationID(encodedText);
+ }
+
+ @Override
+ public String getJMSCorrelationID() {
+ if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
+ return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSCorrelationID(String correlationID) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public Destination getJMSReplyTo() {
+ if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
+ return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSReplyTo(Destination replyTo) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+ @Override
+ public Destination getJMSDestination() {
+ if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
+ return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSDestination(Destination destination) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getBody(Class<T> clazz) throws JMSException {
+ if (clazz.isInstance(body)) {
+ return (T) body;
+ }
+ else {
+ throw new IllegalArgumentException("The class " + clazz
+ + " is unknown to this implementation");
+ }
+ }
+
+ @Override
+ public int getJMSDeliveryMode() {
+ if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
+ return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
+ }
+ return 0;
+ }
+
+ /**
+ * Sets the <CODE>DeliveryMode</CODE> value for this message.
+ * <p/>
+ * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not
+ * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method.
+ *
+ * @param deliveryMode the delivery mode for this message
+ * @see javax.jms.Message#getJMSDeliveryMode()
+ * @see javax.jms.DeliveryMode
+ */
+
+ @Override
+ public void setJMSDeliveryMode(int deliveryMode) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
+ return clazz.isInstance(body);
+ }
+
+ @Override
+ public boolean getJMSRedelivered() {
+ return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
+ && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
+ }
+
+ @Override
+ public void setJMSRedelivered(boolean redelivered) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ /**
+ * copy meta data from source message
+ *
+ * @param sourceMessage source message
+ */
+ public void copyMetaData(JmsBaseMessage sourceMessage) {
+ if (!sourceMessage.getHeaders().isEmpty()) {
+ for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) {
+ if (!headerExits(entry.getKey())) {
+ setHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if (!sourceMessage.getProperties().isEmpty()) {
+ for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) {
+ if (!propertyExists(entry.getKey())) {
+ setObjectProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getJMSType() {
+ return (String) headers.get(JmsBaseConstant.JMS_TYPE);
+ }
+
+ @Override
+ public void setJMSType(String type) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public Map<String, Object> getHeaders() {
+ return this.headers;
+ }
+
+ @Override
+ public long getJMSExpiration() {
+ if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
+ return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
+ }
+ return 0;
+ }
+
+ @Override
+ public void setJMSExpiration(long expiration) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public boolean headerExits(String name) {
+ return this.headers.containsKey(name);
+ }
+
+ @Override
+ public int getJMSPriority() {
+ if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
+ return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
+ }
+ return 5;
+ }
+
+ @Override
+ public void setJMSPriority(int priority) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void setHeader(String name, Object value) {
+ this.headers.put(name, value);
+ }
+
+ public Map<String, Object> getProperties() {
+ return this.properties;
+ }
+
+ public void setProperties(Map<String, Object> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void acknowledge() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public void clearProperties() {
+ this.properties.clear();
+ }
+
+ @Override
+ public void clearBody() {
+ this.body = null;
+ }
+
+ @Override
+ public boolean propertyExists(String name) {
+ return properties.containsKey(name);
+ }
+
+ @Override
+ public boolean getBooleanProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString());
+ }
+ return false;
+ }
+
+ @Override
+ public byte getByteProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public short getShortProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Short ? (Short) value : Short.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public int getIntProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLongProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Long ? (Long) value : Long.valueOf(value.toString());
+ }
+ return 0L;
+ }
+
+ @Override
+ public float getFloatProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Float ? (Float) value : Float.valueOf(value.toString());
+ }
+ return 0f;
+ }
+
+ @Override
+ public double getDoubleProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Double ? (Double) value : Double.valueOf(value.toString());
+ }
+ return 0d;
+ }
+
+ @Override
+ public String getStringProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ return getObjectProperty(name).toString();
+ }
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(String name) throws JMSException {
+ return this.properties.get(name);
+ }
+
+ @Override
+ public Enumeration<?> getPropertyNames() throws JMSException {
+ final Object[] keys = this.properties.keySet().toArray();
+ return new Enumeration<Object>() {
+ int i;
+
+ @Override
+ public boolean hasMoreElements() {
+ return i < keys.length;
+ }
+
+ @Override
+ public Object nextElement() {
+ return keys[i++];
+ }
+ };
+ }
+
+ @Override
+ public void setBooleanProperty(String name, boolean value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setByteProperty(String name, byte value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setShortProperty(String name, short value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setIntProperty(String name, int value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setLongProperty(String name, long value) {
+ setObjectProperty(name, value);
+ }
+
+ public void setFloatProperty(String name, float value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setDoubleProperty(String name, double value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setStringProperty(String name, String value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setObjectProperty(String name, Object value) {
+ if (value instanceof Number || value instanceof String || value instanceof Boolean) {
+ this.properties.put(name, value);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Value should be boolean, byte, short, int, long, float, double, and String.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
new file mode 100644
index 0000000..b1e85b0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+/**
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte
+ * messages, they are typically not used, since the inclusion of properties may affect the format. <P>
+ */
+public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage {
+ private DataInputStream dataAsInput;
+ private DataOutputStream dataAsOutput;
+ private ByteArrayOutputStream bytesOut;
+ private byte[] bytesIn;
+
+ /**
+ * Message created for reading
+ *
+ * @param data
+ */
+ public JmsBytesMessage(byte[] data) {
+ this.bytesIn = data;
+ dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length));
+ }
+
+ /**
+ * Message created to be sent
+ */
+ public JmsBytesMessage() {
+ bytesOut = new ByteArrayOutputStream();
+ dataAsOutput = new DataOutputStream(bytesOut);
+ }
+
+ public long getBodyLength() throws JMSException {
+ return getData().length;
+ }
+
+ /**
+ * @return the data
+ */
+ public byte[] getData() {
+ if (bytesOut != null) {
+ return bytesOut.toByteArray();
+ }
+ else {
+ return bytesIn;
+ }
+
+ }
+
+ public boolean readBoolean() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public byte readByte() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readUnsignedByte() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public short readShort() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readUnsignedShort() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public char readChar() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readInt() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public long readLong() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public float readFloat() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public double readDouble() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public String readUTF() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readBytes(byte[] value) throws JMSException {
+ return readBytes(value, value.length);
+ }
+
+ public int readBytes(byte[] value, int length) throws JMSException {
+ if (length > value.length) {
+ throw new IndexOutOfBoundsException("length must be smaller than the length of value");
+ }
+ if (dataAsInput == null) {
+ throw new MessageNotReadableException("Message is not readable! ");
+ }
+ try {
+ int offset = 0;
+ while (offset < length) {
+ int read = dataAsInput.read(value, offset, length - offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+
+ if (offset == 0 && length != 0) {
+ return -1;
+ }
+ else {
+ return offset;
+ }
+ }
+ catch (IOException e) {
+ throw handleInputException(e);
+ }
+
+ }
+
+ public void writeBoolean(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeByte(byte value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeShort(short value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeChar(char value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeInt(int value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeLong(long value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeFloat(float value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeDouble(double value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeUTF(String value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeBytes(byte[] value) throws JMSException {
+ if (dataAsOutput == null) {
+ throw new MessageNotWriteableException("Message is not writable! ");
+ }
+ try {
+ dataAsOutput.write(value);
+ }
+ catch (IOException e) {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ if (dataAsOutput == null) {
+ throw new MessageNotWriteableException("Message is not writable! ");
+ }
+ try {
+ dataAsOutput.write(value, offset, length);
+ }
+ catch (IOException e) {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeObject(Object value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void reset() throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ private JMSException handleOutputException(final IOException e) {
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ private JMSException handleInputException(final IOException e) {
+ JMSException ex;
+ if (e instanceof EOFException) {
+ ex = new MessageEOFException(e.getMessage());
+ }
+ else {
+ ex = new MessageFormatException(e.getMessage());
+ }
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
new file mode 100644
index 0000000..f67da14
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage {
+
+ public JmsObjectMessage(Serializable object) {
+ this.body = object;
+ }
+
+ public JmsObjectMessage() {
+
+ }
+
+ public Serializable getObject() throws JMSException {
+ return this.body;
+ }
+
+ public void setObject(Serializable object) throws JMSException {
+ this.body = object;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
new file mode 100644
index 0000000..ce19b51
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+public class JmsTextMessage extends JmsBaseMessage implements TextMessage {
+ private String text;
+
+ public JmsTextMessage() {
+
+ }
+
+ public JmsTextMessage(String text) {
+ setText(text);
+ }
+
+ public void clearBody() {
+ this.text = null;
+ super.clearBody();
+ }
+
+ public String getText() throws JMSException {
+ return this.text;
+ }
+
+ public void setText(String text) {
+ this.body = text;
+ this.text = text;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
new file mode 100644
index 0000000..bd926e5
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+
+public class ExceptionUtil {
+ public static final boolean SKIP_SET_EXCEPTION
+ = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false"));
+
+ public static void handleUnSupportedException() {
+ if (!ExceptionUtil.SKIP_SET_EXCEPTION) {
+ throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," +
+ " use '-Dskip.set.exception=true' in JVM options.");
+ }
+ }
+
+ public static JMSException convertToJmsException(Exception e, String extra) {
+ Preconditions.checkNotNull(extra);
+ Preconditions.checkNotNull(e);
+ JMSException jmsException = new JMSException(extra);
+ jmsException.initCause(e);
+ return jmsException;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
new file mode 100644
index 0000000..3cf03f9
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.jms.BytesMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+
+import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders;
+
+public class MessageConverter {
+ public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception {
+ byte[] content;
+ if (jmsMessage instanceof TextMessage) {
+ if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(),
+ Charsets.UTF_8.toString());
+ }
+ else if (jmsMessage instanceof ObjectMessage) {
+ if (((ObjectMessage) jmsMessage).getObject() == null) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject());
+ }
+ else if (jmsMessage instanceof BytesMessage) {
+ JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage;
+ if (bytesMessage.getBodyLength() == 0) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = bytesMessage.getData();
+ }
+ else {
+ throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType());
+ }
+
+ return content;
+ }
+
+ public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception {
+ JmsBaseMessage message;
+ if (MsgConvertUtil.MSGMODEL_BYTES.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsBytesMessage(msg.getBody());
+ }
+ else if (MsgConvertUtil.MSGMODEL_OBJ.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody()));
+ }
+ else if (MsgConvertUtil.MSGMODEL_TEXT.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(),
+ Charsets.UTF_8.toString()));
+ }
+ else {
+ // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL.
+ message = new JmsBytesMessage(msg.getBody());
+ }
+
+ //-------------------------set headers-------------------------
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId());
+
+ if (msg.getReconsumeTimes() > 0) {
+ message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE);
+ }
+ else {
+ message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+ }
+
+ Map<String, String> propertiesMap = msg.getProperties();
+ if (propertiesMap != null) {
+ for (String properName : propertiesMap.keySet()) {
+ String properValue = propertiesMap.get(properName);
+ if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) {
+ String destinationStr = properValue;
+ if (null != destinationStr) {
+ List<String> msgTuple = Arrays.asList(destinationStr.split(":"));
+ message.setHeader(JmsBaseConstant.JMS_DESTINATION,
+ new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)));
+ }
+ }
+ else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) ||
+ JmsBaseConstant.JMS_PRIORITY.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) ||
+ JmsBaseConstant.JMS_EXPIRATION.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) ||
+ JmsBaseConstant.JMS_TYPE.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) ||
+ JmsBaseConstant.JMS_REDELIVERED.equals(properName)) {
+ //JMS_MESSAGE_ID should set by msg.getMsgID()
+ continue;
+ }
+ else {
+ properties.put(properName, properValue);
+ }
+ }
+ }
+
+ //Handle System properties, put into header.
+ //add what?
+ message.setProperties(properties);
+
+ return message;
+ }
+
+ public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception {
+ Message rocketmqMsg = new MessageExt();
+ // 1. Transform message body
+ rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg));
+
+ // 2. Transform topic and messageType
+ JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION);
+ String topic = destination.getMessageTopic();
+ rocketmqMsg.setTopic(topic);
+ String messageType = destination.getMessageType();
+ Preconditions.checkState(!messageType.contains("||"),
+ "'||' can not be in the destination when sending a message");
+ rocketmqMsg.setTags(messageType);
+
+ // 3. Transform message properties
+ Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType);
+ for (String name : properties.stringPropertyNames()) {
+ String value = properties.getProperty(name);
+ if (MessageConst.PROPERTY_KEYS.equals(name)) {
+ rocketmqMsg.setKeys(value);
+ } else if (MessageConst.PROPERTY_TAGS.equals(name)) {
+ rocketmqMsg.setTags(value);
+ } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) {
+ rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value));
+ } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) {
+ rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value));
+ } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) {
+ rocketmqMsg.setBuyerId(value);
+ } else {
+ rocketmqMsg.putUserProperty(name, value);
+ }
+ }
+
+ return rocketmqMsg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
new file mode 100644
index 0000000..ec55bbc
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class MsgConvertUtil {
+
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ public static final String EMPTY_STRING = "";
+
+ public static final String JMS_MSGMODEL = "jmsMsgModel";
+ /**
+ * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message
+ * model, value can be textMessage OR objectMessage
+ */
+ public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel";
+
+ public static final String MSGMODEL_TEXT = "textMessage";
+ public static final String MSGMODEL_BYTES = "bytesMessage";
+ public static final String MSGMODEL_OBJ = "objectMessage";
+
+ public static final String MSG_TOPIC = "msgTopic";
+ public static final String MSG_TYPE = "msgType";
+
+ public static byte[] objectSerialize(Object object) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(object);
+ oos.close();
+ baos.close();
+ return baos.toByteArray();
+ }
+
+ public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ ois.close();
+ bais.close();
+ return (Serializable) ois.readObject();
+ }
+
+ public static final byte[] string2Bytes(String s, String charset) {
+ if (null == s) {
+ return EMPTY_BYTES;
+ }
+ byte[] bs = null;
+ try {
+ bs = s.getBytes(charset);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ return bs;
+ }
+
+ public static final String bytes2String(byte[] bs, String charset) {
+ if (null == bs) {
+ return EMPTY_STRING;
+ }
+ String s = null;
+ try {
+ s = new String(bs, charset);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ return s;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
new file mode 100644
index 0000000..9b29928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+
+public abstract class URISpecParser {
+
+ private static final String DEFAULT_BROKER = "rocketmq";
+
+ /**
+ * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2
+ *
+ * @param uri Just like broker://ip:port?key1=value1&key2=value2
+ * @return The parameters' map
+ */
+ public static Map<String, String> parseURI(String uri) {
+ Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!");
+
+ Map<String, String> results = Maps.newHashMap();
+ String broker = uri.substring(0, uri.indexOf(":"));
+ results.put(CommonConstant.PROVIDER, broker);
+
+ if (broker.equals(DEFAULT_BROKER)) {
+ //Special handle for alibaba inner mq broker
+ String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length());
+ if (StringUtils.isNotEmpty(queryStr)) {
+ String[] params = queryStr.split("&");
+ for (String param : params) {
+ if (param.contains("=")) {
+ String[] values = param.split("=", 2);
+ results.put(values[0], values[1]);
+ }
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Broker must be rocketmq");
+ }
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/main/resources/application.conf b/rocketmq-jms/core/src/main/resources/application.conf
new file mode 100644
index 0000000..713c915
--- /dev/null
+++ b/rocketmq-jms/core/src/main/resources/application.conf
@@ -0,0 +1 @@
+version = ${project.version}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
new file mode 100644
index 0000000..d77b13e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import org.junit.Assert;
+
+public class JmsTestListener implements MessageListener {
+
+ private int expectd;
+ private CountDownLatch latch;
+ private AtomicInteger consumedNum = new AtomicInteger(0);
+
+ public JmsTestListener() {
+ this.expectd = 10;
+ }
+ public JmsTestListener(int expectd) {
+ this.expectd = expectd;
+ }
+ public JmsTestListener(int expected, CountDownLatch latch) {
+ this.expectd = expected;
+ this.latch = latch;
+ }
+ @Override
+ public void onMessage(Message message) {
+ try {
+ Assert.assertNotNull(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ if (consumedNum.incrementAndGet() == expectd && latch != null) {
+ latch.countDown();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getConsumedNum() {
+ return consumedNum.get();
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void setExpectd(int expectd) {
+ this.expectd = expectd;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
new file mode 100644
index 0000000..855cb19
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+
+public class JmsTestUtil {
+ public static MQProducer getMQProducer(String producerId) throws Exception {
+ Assert.assertNotNull(producerId);
+ Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap");
+ field.setAccessible(true);
+ ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null);
+ return producerMap.get(producerId);
+ }
+ public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception {
+ Assert.assertNotNull(consumerId);
+ Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap");
+ field.setAccessible(true);
+ ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null);
+ return consumerMap.get(consumerId);
+ }
+ public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception {
+ RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+ if (isNull) {
+ Assert.assertNull(rmqPushConsumerExt);
+ } else {
+ Assert.assertNotNull(rmqPushConsumerExt);
+ Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
new file mode 100644
index 0000000..9fe9f5e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsBytesMessageTest {
+
+ private byte[] receiveData = "receive data test".getBytes();
+ private byte[] sendData = "send data test".getBytes();
+
+ @Test
+ public void testGetData() throws Exception {
+ JmsBytesMessage readMessage = new JmsBytesMessage(receiveData);
+
+ System.out.println(new String(readMessage.getData()));
+ Assert.assertEquals(new String(receiveData), new String(readMessage.getData()));
+
+ JmsBytesMessage sendMessage = new JmsBytesMessage();
+ sendMessage.writeBytes(sendData, 0, sendData.length);
+
+ System.out.println(new String(sendMessage.getData()));
+ Assert.assertEquals(new String(sendData), new String(sendMessage.getData()));
+
+ }
+
+ @Test
+ public void testGetBodyLength() throws Exception {
+
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ System.out.println(bytesMessage.getBodyLength());
+ Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+ }
+
+ @Test
+ public void testReadBytes() throws Exception {
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+ byte[] receiveValue = new byte[receiveData.length];
+ bytesMessage.readBytes(receiveValue);
+
+ System.out.println(new String(receiveValue));
+ Assert.assertEquals(new String(receiveValue), new String(receiveData));
+
+ }
+
+ @Test
+ public void testReadBytes1() throws Exception {
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ byte[] receiveValue1 = new byte[2];
+ bytesMessage.readBytes(receiveValue1, 2);
+ System.out.println(new String(receiveValue1));
+ Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1));
+
+ byte[] receiceValue2 = new byte[2];
+ bytesMessage.readBytes(receiceValue2, 2);
+ System.out.println(new String(receiceValue2));
+ Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2));
+
+ }
+
+ @Test
+ public void testWriteBytes() throws Exception {
+ JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+ jmsBytesMessage.writeBytes(sendData);
+
+ System.out.println(new String(jmsBytesMessage.getData()));
+ Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData));
+
+ }
+
+ @Test
+ public void testException() throws Exception {
+ JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+
+ byte[] receiveValue = new byte[receiveData.length];
+// Throws out NullPointerException
+// jmsBytesMessage.readBytes(receiveValue);
+
+ JmsBytesMessage sendMessage = new JmsBytesMessage(sendData);
+// Throws out NullPointerException
+// sendMessage.writeBytes("hello again".getBytes());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
new file mode 100644
index 0000000..b570142
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.jms.domain.message;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsMessageConvertTest {
+ @Test
+ public void testCovert2RMQ() throws Exception {
+ //init jmsBaseMessage
+ String topic = "TestTopic";
+ String messageType = "TagA";
+
+ JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText");
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType));
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null");
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic);
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType);
+ jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType);
+ jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType);
+
+ //convert to RMQMessage
+ MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage);
+
+ System.out.println(message);
+
+ //then convert back to jmsBaseMessage
+ JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message);
+
+ JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage;
+ JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack;
+
+ Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText());
+ Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString());
+ Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID());
+ Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered());
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
new file mode 100644
index 0000000..6951976
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsObjectMessageTest {
+
+ @Test
+ public void testGetObject() {
+ JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+ try {
+ Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20));
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testGetBody() {
+ JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+
+ try {
+ User user = (User)jmsObjectMessage.getBody(Object.class);
+ System.out.println(user.getName() + ": " + user.getAge());
+ Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject());
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class User implements Serializable {
+ private String name;
+ private int age;
+
+ private User(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ User user = (User)obj;
+ if (age != user.getAge())
+ return false;
+ if (name != null ? !name.equals(user.getName()) : user.getName() != null)
+ return false;
+ return true;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
new file mode 100644
index 0000000..d3c8287
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsTextMessageTest {
+ private String text = "jmsTextMessage test";
+
+ @Test
+ public void testGetBody() {
+ JmsTextMessage jmsTextMessage = new JmsTextMessage(text);
+ try {
+ Assert.assertEquals(jmsTextMessage.getBody(String.class), text);
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testSetGetText() {
+ JmsTextMessage jmsTextMessage = new JmsTextMessage();
+ jmsTextMessage.setText(text);
+ try {
+ Assert.assertEquals(jmsTextMessage.getText(), text);
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
new file mode 100644
index 0000000..02fe111
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestBase {
+ public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+
+ protected static Random random = new Random();
+ protected static final String SEP = File.separator;
+
+
+ protected static String topic = "jms-test";
+ protected static String topic2 = "jms-test-2";
+ protected static String messageType = "TagA";
+ protected static String producerId = "PID-jms-test";
+ protected static String consumerId = "CID-jms-test";
+ protected static String consumerId2 = "CID-jms-test-2";
+ protected static String nameServer;
+ protected static String text = "English test";
+ protected static int consumeThreadNums = 16;
+
+
+
+
+ protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+ protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+ protected static final List<File> TMPE_FILES = new ArrayList<File>();
+ protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<BrokerController>();
+ protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>();
+
+
+ private static String createBaseDir() {
+ String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
+ final File file = new File(baseDir);
+ if (file.exists()) {
+ System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir));
+ System.exit(1);
+ }
+ TMPE_FILES.add(file);
+ return baseDir;
+ }
+
+ public static NamesrvController createAndStartNamesrv() {
+ String baseDir = createBaseDir();
+ NamesrvConfig namesrvConfig = new NamesrvConfig();
+ NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+ namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
+
+ nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+ NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+ try {
+ Assert.assertTrue(namesrvController.initialize());
+ logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+ namesrvController.start();
+ } catch (Exception e) {
+ System.out.println("Name Server start failed");
+ System.exit(1);
+ }
+ NAMESRV_CONTROLLERS.add(namesrvController);
+ return namesrvController;
+
+ }
+
+
+ public static BrokerController createAndStartBroker(String nsAddr) {
+ String baseDir = createBaseDir();
+ BrokerConfig brokerConfig = new BrokerConfig();
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+ brokerConfig.setBrokerIP1("127.0.0.1");
+ brokerConfig.setNamesrvAddr(nsAddr);
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
+ storeConfig.setHaListenPort(8000 + random.nextInt(1000));
+ nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+ BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
+ try {
+ Assert.assertTrue(brokerController.initialize());
+ logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+ brokerController.start();
+ } catch (Exception e) {
+ System.out.println("Broker start failed");
+ System.exit(1);
+ }
+ BROKER_CONTROLLERS.add(brokerController);
+ return brokerController;
+ }
+
+
+
+ protected static DefaultMQAdminExt defaultMQAdminExt;
+
+ static {
+ //clear the environment
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override public void run() {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) {
+ if (namesrvController != null) {
+ namesrvController.shutdown();
+ }
+ }
+ for (BrokerController brokerController: BROKER_CONTROLLERS) {
+ if (brokerController != null) {
+ brokerController.shutdown();
+ }
+ }
+ for (File file : TMPE_FILES) {
+ deleteFile(file);
+ }
+ }
+ });
+
+
+ NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv();
+ nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+ BrokerController brokerController = createAndStartBroker(nameServer);
+
+ defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExt.setNamesrvAddr(nameServer);
+ try {
+ defaultMQAdminExt.start();
+ } catch (MQClientException e) {
+ System.out.println("DefaultMQAdminExt start failed");
+ System.exit(1);
+ }
+
+ createTopic(topic, brokerController.getBrokerAddr());
+
+
+ }
+
+ public static void deleteFile(File file) {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isFile()) {
+ file.delete();
+ } else if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ for (int i = 0;i < files.length;i ++) {
+ deleteFile(files[i]);
+ }
+ file.delete();
+ }
+ }
+ public static void createTopic(String topic, String addr) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setReadQueueNums(4);
+ topicConfig.setWriteQueueNums(4);
+ try {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ } catch (Exception e) {
+ logger.error("Create topic:{} addr:{} failed", addr, topic);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
new file mode 100644
index 0000000..367700a
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.JmsTestUtil;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsClientIT extends IntegrationTestBase {
+
+ @Test
+ public void testConfigInURI() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+ CommonConstant.PRODUCERID, producerId,
+ CommonConstant.CONSUMERID, consumerId,
+ CommonConstant.NAMESERVER, nameServer,
+ CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+ CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+ CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ session.createConsumer(destination);
+ session.createProducer(destination);
+
+ DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer();
+ Assert.assertNotNull(rmqPushConsumer);
+ Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup());
+ Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName());
+ Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax());
+ Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin());
+ Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr());
+
+ DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId);
+ Assert.assertNotNull(mqProducer);
+ Assert.assertEquals(producerId, mqProducer.getProducerGroup());
+ Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName());
+ Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout());
+ Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr());
+
+ Thread.sleep(2000);
+ }
+ finally {
+ connection.close();
+ }
+
+ }
+
+
+ private Connection createConnection(String producerGroup, String consumerGroup) throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+ CommonConstant.PRODUCERID, producerGroup,
+ CommonConstant.CONSUMERID, consumerGroup,
+ CommonConstant.NAMESERVER, nameServer,
+ CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+ CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+ CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+ return connectionFactory.createConnection();
+ }
+
+ @Test
+ public void testProducerAndConsume_TwoConsumer() throws Exception {
+
+ Connection connection = createConnection(producerId, consumerId);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destinationA = session.createTopic("TopicA");
+ Destination destinationB = session.createTopic("TopicB");
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+ JmsTestListener listenerA = new JmsTestListener(10,countDownLatch);
+ JmsTestListener listenerB = new JmsTestListener(10, countDownLatch);
+
+ try {
+ //two consumers
+ MessageConsumer messageConsumerA = session.createConsumer(destinationA);
+ messageConsumerA.setMessageListener(listenerA);
+ MessageConsumer messageConsumerB = session.createConsumer(destinationB);
+ messageConsumerB.setMessageListener(listenerB);
+ //producer
+ MessageProducer messageProducer = session.createProducer(destinationA);
+ connection.start();
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(destinationB, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+
+ if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertEquals(10, listenerA.getConsumedNum());
+ Assert.assertEquals(10, listenerB.getConsumedNum());
+ }
+ finally {
+ //Close the connection
+ connection.close();
+ }
+
+ }
+
+ @Test
+ public void testProducerAndConsume_TagFilter() throws Exception {
+ Connection connection = createConnection(producerId, consumerId);
+ Connection anotherConnection = createConnection(producerId, consumerId +"other");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination destinationA = session.createTopic("topic:tagA");
+ Destination destinationB = session.createTopic("topic:tagB");
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+ JmsTestListener listenerForTagA = new JmsTestListener(10, countDownLatch);
+ JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch);
+ try {
+ session.createConsumer(destinationA).setMessageListener(listenerForTagA);
+ anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll);
+ //producer
+ MessageProducer messageProducer = session.createProducer(destinationA);
+ connection.start();
+ anotherConnection.start();
+
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(destinationB, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+
+ if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertEquals(20, listenerForTagA.getConsumedNum());
+ Assert.assertEquals(40, listenerForAll.getConsumedNum());
+ }
+ finally {
+ //Close the connection
+ connection.close();
+ anotherConnection.close();
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
----------------------------------------------------------------------
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
new file mode 100644
index 0000000..6cbb7b1
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState;
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsConsumerIT extends IntegrationTestBase {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+
+ private MessageListener listener = new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ Assert.assertNotNull(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+
+ @Test
+ public void testStartIdempotency() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ checkConsumerState(consumerId, true, false);
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+
+ checkConsumerState(consumerId, false, false);
+
+ ((JmsBaseMessageConsumer) consumer).startConsumer();
+ checkConsumerState(consumerId, false, true);
+
+ Destination destination1 = session.createTopic(topic2 + ":" + messageType);
+ MessageConsumer consumer1 = session.createConsumer(destination1);
+ consumer1.setMessageListener(listener);
+
+ ((JmsBaseMessageConsumer) consumer1).startConsumer();
+ checkConsumerState(consumerId, false, true);
+
+ //the start is idempotent
+ connection.start();
+ connection.start();
+
+ Thread.sleep(5000);
+ }
+ finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testReferenceCount() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+
+ RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+ Assert.assertNotNull(rmqPushConsumerExt);
+ Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount());
+
+
+ MessageConsumer consumer2 = session.createConsumer(destination);
+ Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount());
+
+ MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType));
+
+ Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount());
+
+ session.close();
+
+ Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount());
+ Assert.assertEquals(false, rmqPushConsumerExt.isStarted());
+ Assert.assertNull(getRMQPushConsumerExt(consumerId));
+
+ Thread.sleep(5000);
+ }
+ finally {
+ connection.close();
+ }
+ }
+
+}