You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:54 UTC
[44/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
deleted file mode 100644
index 15b75ee..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.header.MetadataProcessor;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Bunch of simple util methods to reduce code in the implementation.
- */
-public class MessageUtil {
-
- // The various message types supported.
- public enum SupportedMessageTypes {
- ONLY_MESSAGE((byte) 0),
- TEXT((byte) 1),
- BYTES((byte) 2),
- MAP((byte) 3),
- STREAM((byte) 4),
- OBJECT((byte) 5);
-
- private final byte type;
- private SupportedMessageTypes(byte type){
- this.type = type;
- }
-
- public byte getType() {
- return type;
- }
- }
-
- private static final Map<Byte, SupportedMessageTypes> valueToSupportedMessageType;
- static {
- SupportedMessageTypes[] arr = SupportedMessageTypes.values();
- Map<Byte, SupportedMessageTypes> map = new HashMap<Byte, SupportedMessageTypes>(arr.length);
- for (SupportedMessageTypes type : arr){
- map.put(type.getType(), type);
- }
- valueToSupportedMessageType = Collections.unmodifiableMap(map);
- }
-
- public static boolean asBoolean(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return false;
- if (null == value) return Boolean.valueOf((String) value);
-
- if (value instanceof Boolean) return (Boolean) value;
- if (value instanceof String) return Boolean.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static byte asByte(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0;
- if (null == value) return Byte.valueOf((String) value);
-
- if (value instanceof Byte) return (Byte) value;
- if (value instanceof String) return Byte.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static short asShort(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0;
- if (null == value) return Short.valueOf((String) value);
-
- if (value instanceof Byte) return (Byte) value;
- if (value instanceof Short) return (Short) value;
- if (value instanceof String) return Short.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static int asInteger(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0;
- if (null == value) return Integer.valueOf((String) value);
-
- if (value instanceof Byte) return (Byte) value;
- if (value instanceof Short) return (Short) value;
- if (value instanceof Integer) return (Integer) value;
- if (value instanceof String) return Integer.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static long asLong(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0;
- if (null == value) return Long.valueOf((String) value);
-
- if (value instanceof Byte) return (Byte) value;
- if (value instanceof Short) return (Short) value;
- if (value instanceof Integer) return (Integer) value;
- if (value instanceof Long) return (Long) value;
- if (value instanceof String) return Long.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static float asFloat(Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0.0f;
- if (null == value) return Float.valueOf((String) value);
-
- if (value instanceof Float) return (Float) value;
- if (value instanceof String) return Float.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static double asDouble (Object value) throws MessageFormatException {
- // The JMS spec explicitly wants us to raise NPE !
- // if (null == value) return 0.0;
- if (null == value) return Double.valueOf((String) value);
-
- if (value instanceof Float) return (Float) value;
- if (value instanceof Double ) return (Double) value;
- if (value instanceof String) return Double.valueOf((String) value);
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static Double asDoubleSelectorProcessing(Object value) throws MessageFormatException {
- if (null == value) return null;
-
- if (value instanceof Float) return (double) (Float) value;
- if (value instanceof Double ) return (Double) value;
-
- if (value instanceof Long) return (double) (Long) value;
- if (value instanceof Integer) return (double) (Integer) value;
- if (value instanceof Short) return (double) (Short) value;
- if (value instanceof Byte) return (double) (Byte) value;
-
- return null;
- }
-
- public static Integer asIntegerSelectorProcessing(Object value) throws MessageFormatException {
- if (null == value) return null;
-
- if (value instanceof Float) return (int) (float) (Float) value;
- if (value instanceof Double ) return (int) (double) (Double) value;
-
- if (value instanceof Long) return (int) (long) (Long) value;
- if (value instanceof Integer) return (Integer) value;
- if (value instanceof Short) return (int) (Short) value;
- if (value instanceof Byte) return (int) (Byte) value;
-
- return null;
- }
-
- public static String asString(Object value) {
- if (null == value) return null;
-
- if (value instanceof String) return (String) value;
- // converts from boolean, byte, short, char, int, long, float and double to String.
- return "" + value;
- }
-
- public static char asChar(Object value) throws MessageFormatException {
- // treat it as integer with null
- if (null == value) return (char) 0;
-
- // only from/to char
- if (value instanceof Character) return (Character) value;
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static byte[] asBytes(Object value) throws MessageFormatException {
- if (null == value || value instanceof byte[]) return (byte[]) value;
- throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
- }
-
- public static boolean isValidKey(String key) {
- return null != key && 0 != key.length();
- }
-
- public static byte[] objectToBytes(Object obj) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- try {
- oos.writeObject(obj);
- oos.flush();
- } finally {
- try { oos.close(); } catch (IOException ioEx) { /* ignore */ }
- }
-
- return baos.toByteArray();
- }
-
- public static Object bytesToObject(byte[] data) throws IOException {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
- try {
- return ois.readObject();
- } catch (ClassNotFoundException cnfEx){
- // unexpected !
- throw new IllegalStateException("Unexpected", cnfEx);
- } finally {
- try { ois.close(); } catch(IOException ioEx) { /* ignore */ }
- }
- }
-
-
-
- public static MessageImpl processHedwigMessage(SessionImpl session, PubSubProtocol.Message message,
- String sourceTopicName, String subscriberId,
- Runnable ackRunnable) throws JMSException {
- Map<String, Object> map = MetadataProcessor.parseHeaders(message);
-
- Object jmsBodyTypeValue = map.get(MessageImpl.JMS_MESSAGE_TYPE_KEY);
- // Should we treat these as bytes message by default ?
- // if (! (jmsBodyTypeValue instanceof Byte) )
- // throw new JMSException("Unsupported message : " + message + ", unable to determine jms message type " +
- // jmsBodyTypeValue);
- if (! (jmsBodyTypeValue instanceof Byte) ) jmsBodyTypeValue = (Byte) SupportedMessageTypes.BYTES.getType();
-
- SupportedMessageTypes type = valueToSupportedMessageType.get((Byte) jmsBodyTypeValue);
- switch (type){
- case STREAM:
- return new StreamMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- case MAP:
- return new MapMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- case TEXT:
- return new TextMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- case OBJECT:
- return new ObjectMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- case BYTES:
- return new BytesMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- case ONLY_MESSAGE:
- return new MessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
- default:
- throw new JMSException("Unsupported message type : " + type + " for message " + message);
- }
- }
-
- public static MessageImpl createMessageCopy(SessionImpl session, Message message) throws JMSException {
- if (message instanceof MessageImpl) {
- return createMessageImplCopy(session, (MessageImpl) message);
- }
-
- if (message instanceof BytesMessage) {
- return new BytesMessageImpl((BytesMessage) message, session);
- }
- if (message instanceof MapMessage) {
- return new MapMessageImpl((MapMessage) message, session);
- }
- if (message instanceof ObjectMessage) {
- return new ObjectMessageImpl((ObjectMessage) message, session);
- }
- if (message instanceof StreamMessage) {
- return new StreamMessageImpl((StreamMessage) message, session);
- }
- if (message instanceof TextMessage) {
- return new TextMessageImpl((TextMessage) message, session);
- }
-
- return new MessageImpl(message, session);
- }
-
- private static MessageImpl createMessageImplCopy(SessionImpl session, MessageImpl message)
- throws JMSException {
-
- if (message instanceof BytesMessageImpl) {
- return new BytesMessageImpl(session, (BytesMessageImpl) message, message.getSourceName(),
- message.getSubscriberId());
- }
- if (message instanceof MapMessageImpl) {
- return new MapMessageImpl(session, (MapMessageImpl) message, message.getSourceName(),
- message.getSubscriberId());
- }
- if (message instanceof ObjectMessageImpl) {
- return new ObjectMessageImpl(session, (ObjectMessageImpl) message, message.getSourceName(),
- message.getSubscriberId());
- }
- if (message instanceof StreamMessageImpl) {
- return new StreamMessageImpl(session, (StreamMessageImpl) message, message.getSourceName(),
- message.getSubscriberId());
- }
- if (message instanceof TextMessageImpl) {
- return new TextMessageImpl(session, (TextMessageImpl) message, message.getSourceName(),
- message.getSubscriberId());
- }
-
- return new MessageImpl(session, message, message.getSourceName(), message.getSubscriberId());
- }
-
- private static final String JMS_MESSAGE_ID_PREFIX = "ID:";
- private static final String LOCAL_PREFIX = "LOCAL(";
- private static final String REMOTE_PREFIX = "REMOTE(";
- private static final char SEQ_ID_SUFFIX = ')';
- private static final char REMOTE_RECORD_SEPARATOR = ',';
- private static final char REMOTE_RECORD_SEQ_ID_PREFIX = '[';
- private static final char REMOTE_RECORD_SEQ_ID_SUFFIX = ']';
- private static final Pattern remoteMessageIdSplitPattern = Pattern.compile("" + REMOTE_RECORD_SEPARATOR);
-
- /**
- * Based on
- * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)}
- *
- * This is tightly coupled with
- * @see #generateSeqIdFromJMSMessageId(String)
- *
- * @param seqId The sequence id to convert to string.
- * @return The string representation of the seq-id.
- */
- public static String generateJMSMessageIdFromSeqId(final PubSubProtocol.MessageSeqId seqId) {
- StringBuilder sb = new StringBuilder();
- // mandatory prefix for system generated id's.
- sb.append(JMS_MESSAGE_ID_PREFIX);
-
- if (seqId.hasLocalComponent()) {
- sb.append(LOCAL_PREFIX).append(seqId.getLocalComponent()).append(SEQ_ID_SUFFIX);
- } else {
- List<PubSubProtocol.RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
- boolean first = true;
-
- sb.append(REMOTE_PREFIX);
- for (PubSubProtocol.RegionSpecificSeqId rssid : remoteIds) {
- if (!first) sb.append(REMOTE_RECORD_SEPARATOR);
- first = false;
- sb.append(rssid.getRegion().toStringUtf8());
- sb.append(REMOTE_RECORD_SEQ_ID_PREFIX);
- sb.append(rssid.getSeqId());
- sb.append(REMOTE_RECORD_SEQ_ID_SUFFIX);
- }
- sb.append(SEQ_ID_SUFFIX);
- }
-
- return sb.toString();
- }
-
- /**
- * Based on
- * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)}
- *
- * This is tightly coupled with
- * @see #generateJMSMessageIdFromSeqId(org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId)
- * @param messageId The message id to convert to string.
- * @return The seq-id
- * @throws javax.jms.JMSException In case of exceptions doing the conversion.
- */
- public static PubSubProtocol.MessageSeqId generateSeqIdFromJMSMessageId(final String messageId)
- throws JMSException {
- if (null == messageId || !messageId.startsWith(JMS_MESSAGE_ID_PREFIX)) {
- throw new JMSException("Invalid messageId specified '" + messageId + "'");
- }
-
- PubSubProtocol.MessageSeqId.Builder builder = PubSubProtocol.MessageSeqId.newBuilder();
- // local ?
- if (messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), LOCAL_PREFIX, 0, LOCAL_PREFIX.length())){
- try {
- long seqId = Long.parseLong(messageId.substring(JMS_MESSAGE_ID_PREFIX.length() +
- LOCAL_PREFIX.length(), messageId.length() - 1));
- builder.setLocalComponent(seqId);
- } catch (NumberFormatException nfEx){
- JMSException jEx = new JMSException("Unable to parse local seq id from '" +
- messageId + "' .. " + nfEx);
- jEx.setLinkedException(nfEx);
- throw jEx;
- }
- }
- else {
- assert messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), REMOTE_PREFIX, 0,
- REMOTE_PREFIX.length());
-
- final String[] remoteParts;
- {
- final String remoteMessageId = messageId.substring(JMS_MESSAGE_ID_PREFIX.length() +
- REMOTE_PREFIX.length(), messageId.length() - 1);
- // Should ew stop using pattern and move to using indexOf's ?
- remoteParts = remoteMessageIdSplitPattern.split(remoteMessageId);
- }
-
- for (String remote : remoteParts){
- if (REMOTE_RECORD_SEQ_ID_SUFFIX != remote.charAt(remote.length() - 1))
- throw new JMSException("Invalid remote region snippet (no seq suffix) '" +
- remote + "' within '" + messageId);
- final int regionIndx = remote.indexOf(REMOTE_RECORD_SEQ_ID_PREFIX);
- if (-1 == regionIndx)
- throw new JMSException("Invalid remote region snippet (no region) '" + remote +
- "' within '" + messageId);
- final String region = remote.substring(0, regionIndx);
- final long seqId;
-
-
- try {
- seqId = Long.parseLong(remote.substring(regionIndx + 1, remote.length() - 1));
- } catch (NumberFormatException nfEx){
- JMSException jEx = new JMSException("Unable to parse remote seq id from '" +
- remote + "' within '" + messageId + "' .. " + nfEx);
- jEx.setLinkedException(nfEx);
- throw jEx;
- }
-
- PubSubProtocol.RegionSpecificSeqId.Builder rbuilder =
- PubSubProtocol.RegionSpecificSeqId.newBuilder();
- rbuilder.setRegion(ByteString.copyFromUtf8(region));
- rbuilder.setSeqId(seqId);
- builder.addRemoteComponents(rbuilder);
- }
- }
-
- return builder.build();
- }
-
- public static MessageImpl createCloneForDispatch(SessionImpl session, MessageImpl msg,
- String sourceTopicName, String subscriberId) throws JMSException {
- MessageImpl retval = msg.createClone(session, sourceTopicName, subscriberId);
- retval.reset();
- return retval;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
deleted file mode 100644
index ba26c4c..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.ObjectMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * read/write serializable java object ...
- *
- */
-public class ObjectMessageImpl extends MessageImpl implements ObjectMessage {
- private Serializable payload;
- private boolean readMode;
-
- public ObjectMessageImpl(SessionImpl session, Serializable payload) {
- super(session);
- this.payload = payload;
- this.readMode = false;
- }
-
- public ObjectMessageImpl(SessionImpl session, ObjectMessageImpl message, String sourceTopicName,
- String subscriberId) throws JMSException {
- super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-
- this.payload = copySerializable(message.getObject());
- this.readMode = message.readMode;
- }
-
- private Serializable copySerializable(Serializable object) throws JMSException {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(object);
- oos.flush();
- oos.close();
- baos.flush();
- baos.close();
-
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
- return (Serializable) ois.readObject();
- } catch (IOException e){
- JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception");
- jmsEx.setLinkedException(e);
- throw jmsEx;
- } catch (ClassNotFoundException e) {
- JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception");
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- }
-
- // To clone a message from a ObjectMessage which is NOT ObjectMessageImpl
- // Changing order of parameter to NOT accidentally clash with the constructor above.
- // This is midly confusing, but helps a lot in preventing accidental bugs !
- public ObjectMessageImpl(ObjectMessage message, SessionImpl session) throws JMSException {
- super((Message) message, session);
-
- if (message instanceof ObjectMessageImpl) {
- throw new JMSException("Coding bug - should use this constructor ONLY for non ObjectMessageImpl messages");
- }
-
-
- this.payload = message.getObject();
- this.readMode = false;
- }
-
- public ObjectMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
- String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
- super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
- try {
- this.payload = hasBodyFromProperties() ?
- (Serializable) MessageUtil.bytesToObject(message.getBody().toByteArray()) : null;
- } catch (IOException e) {
- JMSException ex = new JMSException("Unable to read message data .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
- this.readMode = true;
- }
-
- @Override
- protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
- return MessageUtil.SupportedMessageTypes.OBJECT;
- }
-
- @Override
- public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
- PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
- super.populateBuilderWithHeaders(builder);
-
- // Now set body and type.
- try {
- if (! isBodyEmpty()) builder.setBody(ByteString.copyFrom(MessageUtil.objectToBytes(this.payload)));
- } catch (IOException e) {
- JMSException ex = new JMSException("Unable to read message data .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
-
- return builder.build();
- }
-
- protected boolean isBodyEmpty(){
- return null == this.payload;
- }
-
- @Override
- public void setObject(Serializable payload) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- this.payload = payload;
- }
-
- @Override
- public Serializable getObject() throws JMSException {
- return payload;
- }
-
- @Override
- public void clearBody() throws JMSException {
- super.clearBody();
- // allow read and write.
- this.payload = null;
- this.readMode = false;
- }
-
- @Override
- public void reset() throws JMSException {
- if (this.readMode) return ;
- this.readMode = true;
- }
-
- @Override
- ObjectMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId)
- throws JMSException {
-
- return new ObjectMessageImpl(session, this, sourceTopicName, subscriberId);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
deleted file mode 100644
index 2aa74a2..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.Mutable;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.StreamMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.Map;
-
-/**
- * Though similar to BytesMessageImpl, the difference is that BytesMessage expects the user to know
- * the schema while
- * StreamMessage user expects type conversion, etc.
- *
- * In our case, the stream is not a true open stream to the server; it is buffered in memory.
- */
-public class StreamMessageImpl extends MessageImpl implements StreamMessage {
- private ReadOnlyMessage readOnlyMessage;
- private WriteOnlyMessage writeOnlyMessage;
- private boolean readMode;
-
- public StreamMessageImpl(SessionImpl session) throws JMSException {
- super(session);
- clearBody();
- }
-
- // To clone a message
- public StreamMessageImpl(SessionImpl session, StreamMessageImpl message, String sourceTopicName,
- String subscriberId) throws JMSException {
- super(session, (MessageImpl) message, sourceTopicName, subscriberId);
- try {
- if (message.readMode){
- this.readOnlyMessage = new ReadOnlyMessage(message.getPayloadData());
- this.writeOnlyMessage = null;
- }
- else {
- this.readOnlyMessage = null;
- this.writeOnlyMessage = new WriteOnlyMessage(message.getPayloadData());
- }
- } catch (IOException e) {
- JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
-
- this.readMode = message.readMode;
- }
-
- // To clone a message from a StreamMessage which is NOT StreamMessageImpl
- // Changing order of parameter to NOT accidentally clash with the constructor above.
- // This is midly confusing, but helps a lot in preventing accidental bugs !
- public StreamMessageImpl(StreamMessage message, SessionImpl session) throws JMSException {
- super((Message) message, session);
-
- if (message instanceof StreamMessageImpl) {
- throw new JMSException("Coding bug - should use this constructor ONLY for non StreamMessageImpl messages");
- }
-
- final byte[] data;
- try {
- WriteOnlyMessage wom = new WriteOnlyMessage();
- try {
- Object obj;
- while (null != (obj = message.readObject())){
- wom.writeObject(obj);
- }
- } catch (EOFException eof){
- // ignore ...
- }
- data = wom.getPayloadAsBytes(null);
- } catch (IOException e) {
- JMSException jEx = new JMSException("Unable to write to internal message .. " + e);
- jEx.setLinkedException(e);
- throw jEx;
- }
-
- this.writeOnlyMessage = new WriteOnlyMessage(data);
-
- this.readOnlyMessage = null;
- this.readMode = false;
- }
-
- StreamMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
- String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
- super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
- final byte[] data = message.getBody().toByteArray();
- try {
- this.readOnlyMessage = new ReadOnlyMessage(data);
- } catch (IOException e) {
- JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
-
- this.writeOnlyMessage = null;
- this.readMode = true;
- }
-
- @Override
- protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
- return MessageUtil.SupportedMessageTypes.STREAM;
- }
-
- protected boolean isBodyEmpty(){
- return false;
- }
-
- @Override
- public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
- PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
- super.populateBuilderWithHeaders(builder);
-
- // Now set body and type.
- try {
- byte[] data = getPayloadData();
- builder.setBody(ByteString.copyFrom(data));
- } catch (IOException e) {
- JMSException ex = new JMSException("Unable to read message data .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
-
- return builder.build();
- }
-
-
- @Override
- public boolean readBoolean() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readBoolean();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public byte readByte() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readByte();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public short readShort() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readShort();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public char readChar() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readChar();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public int readInt() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readInt();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public long readLong() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readLong();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public float readFloat() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readFloat();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public double readDouble() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readDouble();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public String readString() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readString();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public int readBytes(byte[] data) throws JMSException {
- throw new UnsupportedOperationException("Please use readObject - this method is not supported");
- }
-
- @Override
- public Object readObject() throws JMSException {
- if (!readMode) throw new MessageNotReadableException("Message not readable");
- try {
- return readOnlyMessage.readObject();
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("ioEx ?");
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeBoolean(boolean val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeBoolean(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeByte(byte val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeByte(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeShort(short val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeShort(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeChar(char val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeChar(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeInt(int val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeInt(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeLong(long val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeLong(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeFloat(float val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeFloat(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeDouble(double val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeDouble(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeString(String val) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeString(val);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeBytes(byte[] data) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeBytes(data);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void writeBytes(byte[] data, int offset, int length) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeBytes(data, offset, length);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- // This method is ONLY supposed to be used for object form of primitive types !
- @Override
- public void writeObject(Object obj) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- try {
- writeOnlyMessage.writeObject(obj);
- } catch (IOException ioEx){
- JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
- eofEx.setLinkedException(ioEx);
- throw eofEx;
- }
- }
-
- @Override
- public void reset() throws JMSException {
- if (this.readMode) return ;
- this.readMode = true;
- try {
- byte[] data = writeOnlyMessage.getPayloadAsBytes(null);
- this.readOnlyMessage = new ReadOnlyMessage(data);
- } catch (IOException e) {
- JMSException ex = new JMSException("cant convert to read only message ... unexpected actually .. " + e);
- ex.setLinkedException(e);
- throw ex;
- }
- this.writeOnlyMessage = null;
- }
-
- @Override
- public void clearBody() throws JMSException {
- super.clearBody();
- this.writeOnlyMessage = new WriteOnlyMessage();
- this.readOnlyMessage = null;
- this.readMode = false;
- }
-
- private byte[] getPayloadData() throws IOException, IllegalStateException {
- if (readMode) return readOnlyMessage.getDataCopy();
-
- Mutable<byte[]> preCloseData = new Mutable<byte[]>(null);
- byte[] data = writeOnlyMessage.getPayloadAsBytes(preCloseData);
-
- writeOnlyMessage = new WriteOnlyMessage(preCloseData.getValue());
- return data;
- }
-
-
- @Override
- StreamMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId)
- throws JMSException {
- return new StreamMessageImpl(session, this, sourceTopicName, subscriberId);
- }
-
- // Using java object's instead of primitives to avoid having to store schema separately.
- private static class ReadOnlyMessage {
-
- private final ObjectInputStream ois;
- private final byte[] data;
- private final Deque<Object> unreadObjects = new ArrayDeque<Object>(4);
-
- public ReadOnlyMessage(byte[] data) throws IOException {
- this.data = data;
- this.ois = new ObjectInputStream(new ByteArrayInputStream(data));
- }
-
- public byte[] getDataCopy(){
- return Arrays.copyOf(data, data.length);
- }
-
- private void unreadObject(Object obj) {
- unreadObjects.push(obj);
- }
-
- private Object readNextObject() throws IOException, JMSException {
- try {
- if (! unreadObjects.isEmpty()) return unreadObjects.pop();
-
- return ois.readObject();
- } catch (ClassNotFoundException e) {
- // unexpected !
- javax.jms.IllegalStateException jEx =
- new javax.jms.IllegalStateException("Unexpected not to be able to resolve class");
- jEx.setLinkedException(e);
- throw jEx;
- } catch (EOFException eof) {
- throw new MessageEOFException("eof");
- }
- }
-
- public boolean readBoolean() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Boolean value = MessageUtil.asBoolean(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public byte readByte() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Byte value = MessageUtil.asByte(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public short readShort() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Short value = MessageUtil.asShort(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public char readChar() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Character value = MessageUtil.asChar(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public int readInt() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Integer value = MessageUtil.asInteger(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public long readLong() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Long value = MessageUtil.asLong(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public float readFloat() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Float value = MessageUtil.asFloat(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public double readDouble() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- Double value = MessageUtil.asDouble(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public String readString() throws IOException, JMSException {
- Object obj = readNextObject();
- boolean failed = true;
- try {
- String value = MessageUtil.asString(obj);
- failed = false;
- return value;
- } finally {
- if (failed) unreadObject(obj);
- }
- }
-
- public Object readObject() throws IOException, JMSException {
- return readNextObject();
- }
- }
-
- private static class WriteOnlyMessage {
-
- private final ByteArrayOutputStream baos;
- // private ObjectOutputStream oos;
- private final ObjectOutputStream oos;
-
- public WriteOnlyMessage() throws JMSException {
- baos = new ByteArrayOutputStream();
- try {
- oos = new ObjectOutputStream(baos);
- } catch (IOException e) {
- IllegalStateException jEx =
- new IllegalStateException("Unexpected to not be able to create empty write only message");
- jEx.setLinkedException(e);
- throw jEx;
- }
- }
-
- private WriteOnlyMessage(final byte[] data) throws IllegalStateException {
- baos = new ByteArrayOutputStream();
- try {
- if (null != data) baos.write(data);
- baos.flush();
- oos = new ObjectOutputStream(baos){
- // Do not write the header if data is based on already materialized stream.
- protected void writeStreamHeader() throws IOException {
- if (null == data || 0 == data.length) super.writeStreamHeader();
- }
- };
- } catch (IOException e) {
- IllegalStateException jEx =
- new IllegalStateException("Unexpected to not be able to create empty write only message");
- jEx.setLinkedException(e);
- throw jEx;
- }
- }
-
- public byte[] getPayloadAsBytes(Mutable<byte[]> preCloseData) throws IOException {
- oos.flush();
- baos.flush();
- if (null != preCloseData) preCloseData.setValue(baos.toByteArray());
- oos.close();
- baos.flush();
- baos.close();
- // oos = null;
- return baos.toByteArray();
- }
-
- public void writeBoolean(boolean val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeByte(byte val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeShort(short val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeChar(char val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeInt(int val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeLong(long val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeFloat(float val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeDouble(double val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeString(String val) throws IOException {
- oos.writeObject(val);
- }
-
- public void writeBytes(byte[] data) throws IOException {
- oos.writeObject(data);
- }
-
- // copy and write as a single byte array.
- public void writeBytes(byte[] data, int offset, int length) throws IOException {
- byte[] arr = new byte[length];
- System.arraycopy(data, offset, arr, 0, length);
- writeBytes(arr);
- }
-
- public void writeObject(Object obj) throws JMSException, IOException {
- // unrolling it
- if (obj instanceof Boolean) {
- writeBoolean((Boolean) obj);
- }
- else if (obj instanceof Byte) {
- writeByte((Byte) obj);
- }
- else if (obj instanceof Short) {
- writeShort((Short) obj);
- }
- else if (obj instanceof Character) {
- writeChar((Character) obj);
- }
- else if (obj instanceof Integer) {
- writeInt((Integer) obj);
- }
- else if (obj instanceof Long) {
- writeLong((Long) obj);
- }
- else if (obj instanceof Float) {
- writeFloat((Float) obj);
- }
- else if (obj instanceof Double) {
- writeDouble((Double) obj);
- }
- else if (obj instanceof String) {
- writeString((String) obj);
- }
- else if (obj instanceof byte[]) {
- writeBytes((byte[]) obj);
- }
- else{
- throw new JMSException("Unsupported type for obj : " + obj.getClass());
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
deleted file mode 100644
index dc3a3ca..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.TextMessage;
-import java.util.Map;
-
-/**
- * read/write text message ...
- */
-public class TextMessageImpl extends MessageImpl implements TextMessage {
- private String payload;
- private boolean readMode;
-
- public TextMessageImpl(SessionImpl session) {
- super(session);
- this.readMode = false;
- }
-
- public TextMessageImpl(SessionImpl session, String payload) {
- super(session);
- this.payload = payload;
- this.readMode = false;
- }
-
- public TextMessageImpl(SessionImpl session, TextMessageImpl message, String sourceTopicName,
- String subscriberId) throws JMSException {
- super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-
- this.payload = message.getText();
- this.readMode = message.readMode;
- }
-
-
- // To clone a message from a TextMessage which is NOT TextMessageImpl
- // Changing order of parameter to NOT accidentally clash with the constructor above.
- // This is midly confusing, but helps a lot in preventing accidental bugs !
- public TextMessageImpl(TextMessage message, SessionImpl session) throws JMSException {
- super((Message) message, session);
-
- if (message instanceof TextMessageImpl) {
- throw new JMSException("Coding bug - should use this constructor ONLY for non TextMessageImpl messages");
- }
-
- this.payload = message.getText();
- this.readMode = false;
- }
-
- public TextMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
- String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
- super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
- this.payload = hasBodyFromProperties() ? message.getBody().toStringUtf8() : null;
- this.readMode = true;
- }
-
- @Override
- protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
- return MessageUtil.SupportedMessageTypes.TEXT;
- }
-
- @Override
- public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
- PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
- super.populateBuilderWithHeaders(builder);
- if (! isBodyEmpty()) builder.setBody(ByteString.copyFromUtf8(this.payload));
- return builder.build();
- }
-
- protected boolean isBodyEmpty(){
- return null == this.payload;
- }
-
- @Override
- public void setText(String payload) throws JMSException {
- if (readMode) throw new MessageNotWriteableException("Message not writable");
- this.payload = payload;
- }
-
- @Override
- public String getText() throws JMSException {
- return payload;
- }
-
- @Override
- public void clearBody() throws JMSException {
- super.clearBody();
- this.payload = null;
- this.readMode = false;
- }
-
- @Override
- public void reset() throws JMSException {
- if (this.readMode) return ;
- this.readMode = true;
- }
-
- @Override
- TextMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException {
- return new TextMessageImpl(session, this, sourceTopicName, subscriberId);
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("TextMessageImpl");
- sb.append("{payload='").append(payload).append('\'');
- sb.append(", readMode=").append(readMode);
- sb.append(", parent=").append(super.toString());
- sb.append('}');
- return sb.toString();
- }
-}