You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:54 UTC

[44/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
deleted file mode 100644
index 15b75ee..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.header.MetadataProcessor;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Bunch of simple util methods to reduce code in the implementation.
- */
-public class MessageUtil {
-
-    // The various message types supported.
-    public enum SupportedMessageTypes {
-      ONLY_MESSAGE((byte) 0),
-      TEXT((byte) 1),
-      BYTES((byte) 2),
-      MAP((byte) 3),
-      STREAM((byte) 4),
-      OBJECT((byte) 5);
-
-      private final byte type;
-      private SupportedMessageTypes(byte type){
-        this.type = type;
-      }
-
-      public byte getType() {
-        return type;
-      }
-    }
-
-    private static final Map<Byte, SupportedMessageTypes> valueToSupportedMessageType;
-    static {
-        SupportedMessageTypes[] arr = SupportedMessageTypes.values();
-        Map<Byte, SupportedMessageTypes> map = new HashMap<Byte, SupportedMessageTypes>(arr.length);
-        for (SupportedMessageTypes type : arr){
-            map.put(type.getType(), type);
-        }
-        valueToSupportedMessageType = Collections.unmodifiableMap(map);
-    }
-
-    public static boolean asBoolean(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return false;
-        if (null == value) return Boolean.valueOf((String) value);
-
-        if (value instanceof Boolean) return (Boolean) value;
-        if (value instanceof String) return Boolean.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static byte asByte(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0;
-        if (null == value) return Byte.valueOf((String) value);
-
-        if (value instanceof Byte) return (Byte) value;
-        if (value instanceof String) return Byte.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static short asShort(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0;
-        if (null == value) return Short.valueOf((String) value);
-
-        if (value instanceof Byte) return (Byte) value;
-        if (value instanceof Short) return (Short) value;
-        if (value instanceof String) return Short.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static int asInteger(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0;
-        if (null == value) return Integer.valueOf((String) value);
-
-        if (value instanceof Byte) return (Byte) value;
-        if (value instanceof Short) return (Short) value;
-        if (value instanceof Integer) return (Integer) value;
-        if (value instanceof String) return Integer.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static long asLong(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0;
-        if (null == value) return Long.valueOf((String) value);
-
-        if (value instanceof Byte) return (Byte) value;
-        if (value instanceof Short) return (Short) value;
-        if (value instanceof Integer) return (Integer) value;
-        if (value instanceof Long) return (Long) value;
-        if (value instanceof String) return Long.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static float asFloat(Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0.0f;
-        if (null == value) return Float.valueOf((String) value);
-
-        if (value instanceof Float) return (Float) value;
-        if (value instanceof String) return Float.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static double asDouble (Object value) throws MessageFormatException {
-        // The JMS spec explicitly wants us to raise NPE !
-        // if (null == value) return 0.0;
-        if (null == value) return Double.valueOf((String) value);
-
-        if (value instanceof Float) return (Float) value;
-        if (value instanceof Double ) return (Double) value;
-        if (value instanceof String) return Double.valueOf((String) value);
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static Double asDoubleSelectorProcessing(Object value) throws MessageFormatException {
-        if (null == value) return null;
-
-        if (value instanceof Float) return (double) (Float) value;
-        if (value instanceof Double ) return (Double) value;
-
-        if (value instanceof Long) return (double) (Long) value;
-        if (value instanceof Integer) return (double) (Integer) value;
-        if (value instanceof Short) return (double) (Short) value;
-        if (value instanceof Byte) return (double) (Byte) value;
-
-        return null;
-    }
-
-    public static Integer asIntegerSelectorProcessing(Object value) throws MessageFormatException {
-        if (null == value) return null;
-
-        if (value instanceof Float) return (int) (float) (Float) value;
-        if (value instanceof Double ) return (int) (double) (Double) value;
-
-        if (value instanceof Long) return (int) (long) (Long) value;
-        if (value instanceof Integer) return (Integer) value;
-        if (value instanceof Short) return (int) (Short) value;
-        if (value instanceof Byte) return (int) (Byte) value;
-
-        return null;
-    }
-
-    public static String asString(Object value) {
-        if (null == value) return null;
-
-        if (value instanceof String) return (String) value;
-        // converts from boolean, byte, short, char, int, long, float and double to String.
-        return "" + value;
-    }
-
-    public static char asChar(Object value) throws MessageFormatException {
-        // treat it as integer with null
-        if (null == value) return (char) 0;
-
-        // only from/to char
-        if (value instanceof Character) return (Character) value;
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static byte[] asBytes(Object value) throws MessageFormatException {
-        if (null == value || value instanceof byte[]) return (byte[]) value;
-        throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value);
-    }
-
-    public static boolean isValidKey(String key) {
-        return null != key && 0 != key.length();
-    }
-
-    public static byte[] objectToBytes(Object obj) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        try {
-            oos.writeObject(obj);
-            oos.flush();
-        } finally {
-            try { oos.close(); } catch (IOException ioEx) { /* ignore */ }
-        }
-
-        return baos.toByteArray();
-    }
-
-    public static Object bytesToObject(byte[] data) throws IOException {
-        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
-        try {
-            return ois.readObject();
-        } catch (ClassNotFoundException  cnfEx){
-            // unexpected !
-            throw new IllegalStateException("Unexpected", cnfEx);
-        } finally {
-            try { ois.close(); } catch(IOException ioEx) { /* ignore */ }
-        }
-    }
-
-
-
-    public static MessageImpl processHedwigMessage(SessionImpl session, PubSubProtocol.Message message,
-                                                   String sourceTopicName, String subscriberId,
-                                                   Runnable ackRunnable) throws JMSException {
-        Map<String, Object> map = MetadataProcessor.parseHeaders(message);
-
-        Object jmsBodyTypeValue = map.get(MessageImpl.JMS_MESSAGE_TYPE_KEY);
-        // Should we treat these as bytes message by default ?
-        // if (! (jmsBodyTypeValue instanceof Byte) )
-        //    throw new JMSException("Unsupported message : " + message + ", unable to determine jms message type " +
-        //      jmsBodyTypeValue);
-        if (! (jmsBodyTypeValue instanceof Byte) ) jmsBodyTypeValue = (Byte) SupportedMessageTypes.BYTES.getType();
-
-        SupportedMessageTypes type = valueToSupportedMessageType.get((Byte) jmsBodyTypeValue);
-        switch (type){
-            case STREAM:
-                return new StreamMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            case MAP:
-                return new MapMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            case TEXT:
-                return new TextMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            case OBJECT:
-                return new ObjectMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            case BYTES:
-                return new BytesMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            case ONLY_MESSAGE:
-                return new MessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable);
-            default:
-                throw new JMSException("Unsupported message type : " + type + " for message " + message);
-        }
-    }
-
-    public static MessageImpl createMessageCopy(SessionImpl session, Message message) throws JMSException {
-        if (message instanceof MessageImpl) {
-            return createMessageImplCopy(session, (MessageImpl) message);
-        }
-
-        if (message instanceof BytesMessage) {
-            return new BytesMessageImpl((BytesMessage) message, session);
-        }
-        if (message instanceof MapMessage) {
-            return new MapMessageImpl((MapMessage) message, session);
-        }
-        if (message instanceof ObjectMessage) {
-            return new ObjectMessageImpl((ObjectMessage) message, session);
-        }
-        if (message instanceof StreamMessage) {
-            return new StreamMessageImpl((StreamMessage) message, session);
-        }
-        if (message instanceof TextMessage) {
-            return new TextMessageImpl((TextMessage) message, session);
-        }
-
-        return new MessageImpl(message, session);
-    }
-
-    private static MessageImpl createMessageImplCopy(SessionImpl session, MessageImpl message)
-        throws JMSException {
-
-        if (message instanceof BytesMessageImpl) {
-            return new BytesMessageImpl(session, (BytesMessageImpl) message, message.getSourceName(),
-                message.getSubscriberId());
-        }
-        if (message instanceof MapMessageImpl) {
-            return new MapMessageImpl(session, (MapMessageImpl) message, message.getSourceName(),
-                message.getSubscriberId());
-        }
-        if (message instanceof ObjectMessageImpl) {
-            return new ObjectMessageImpl(session, (ObjectMessageImpl) message, message.getSourceName(),
-                message.getSubscriberId());
-        }
-        if (message instanceof StreamMessageImpl) {
-            return new StreamMessageImpl(session, (StreamMessageImpl) message, message.getSourceName(),
-                message.getSubscriberId());
-        }
-        if (message instanceof TextMessageImpl) {
-            return new TextMessageImpl(session, (TextMessageImpl) message, message.getSourceName(),
-                message.getSubscriberId());
-        }
-
-        return new MessageImpl(session, message, message.getSourceName(), message.getSubscriberId());
-    }
-
-    private static final String JMS_MESSAGE_ID_PREFIX = "ID:";
-    private static final String LOCAL_PREFIX = "LOCAL(";
-    private static final String REMOTE_PREFIX = "REMOTE(";
-    private static final char SEQ_ID_SUFFIX = ')';
-    private static final char REMOTE_RECORD_SEPARATOR = ',';
-    private static final char REMOTE_RECORD_SEQ_ID_PREFIX = '[';
-    private static final char REMOTE_RECORD_SEQ_ID_SUFFIX = ']';
-    private static final Pattern remoteMessageIdSplitPattern = Pattern.compile("" + REMOTE_RECORD_SEPARATOR);
-
-    /**
-     * Based on
-     * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)}
-     *
-     * This is tightly coupled with
-     * @see #generateSeqIdFromJMSMessageId(String)
-     *
-     * @param seqId The sequence id to convert to string.
-     * @return The string representation of the seq-id.
-     */
-    public static String generateJMSMessageIdFromSeqId(final PubSubProtocol.MessageSeqId seqId) {
-        StringBuilder sb = new StringBuilder();
-        // mandatory prefix for system generated id's.
-        sb.append(JMS_MESSAGE_ID_PREFIX);
-
-        if (seqId.hasLocalComponent()) {
-            sb.append(LOCAL_PREFIX).append(seqId.getLocalComponent()).append(SEQ_ID_SUFFIX);
-        } else {
-            List<PubSubProtocol.RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
-            boolean first = true;
-
-            sb.append(REMOTE_PREFIX);
-            for (PubSubProtocol.RegionSpecificSeqId rssid : remoteIds) {
-                if (!first) sb.append(REMOTE_RECORD_SEPARATOR);
-                first = false;
-                sb.append(rssid.getRegion().toStringUtf8());
-                sb.append(REMOTE_RECORD_SEQ_ID_PREFIX);
-                sb.append(rssid.getSeqId());
-                sb.append(REMOTE_RECORD_SEQ_ID_SUFFIX);
-            }
-            sb.append(SEQ_ID_SUFFIX);
-        }
-
-        return sb.toString();
-    }
-
-    /**
-     * Based on
-     * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)}
-     *
-     * This is tightly coupled with
-     * @see #generateJMSMessageIdFromSeqId(org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId)
-     * @param messageId The message id to convert to string.
-     * @return The seq-id
-     * @throws javax.jms.JMSException In case of exceptions doing the conversion.
-     */
-    public static PubSubProtocol.MessageSeqId generateSeqIdFromJMSMessageId(final String messageId)
-        throws JMSException {
-        if (null == messageId || !messageId.startsWith(JMS_MESSAGE_ID_PREFIX)) {
-            throw new JMSException("Invalid messageId specified '" + messageId + "'");
-        }
-
-        PubSubProtocol.MessageSeqId.Builder builder = PubSubProtocol.MessageSeqId.newBuilder();
-        // local ?
-        if (messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), LOCAL_PREFIX, 0, LOCAL_PREFIX.length())){
-            try {
-                long seqId = Long.parseLong(messageId.substring(JMS_MESSAGE_ID_PREFIX.length() +
-                    LOCAL_PREFIX.length(), messageId.length() - 1));
-                builder.setLocalComponent(seqId);
-            } catch (NumberFormatException nfEx){
-                JMSException jEx = new JMSException("Unable to parse local seq id from '" +
-                    messageId + "' .. " + nfEx);
-                jEx.setLinkedException(nfEx);
-                throw jEx;
-            }
-        }
-        else {
-            assert messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), REMOTE_PREFIX, 0,
-                REMOTE_PREFIX.length());
-
-            final String[] remoteParts;
-            {
-                final String remoteMessageId = messageId.substring(JMS_MESSAGE_ID_PREFIX.length() +
-                    REMOTE_PREFIX.length(), messageId.length() - 1);
-                // Should ew stop using pattern and move to using indexOf's ?
-                remoteParts = remoteMessageIdSplitPattern.split(remoteMessageId);
-            }
-
-            for (String remote : remoteParts){
-                if (REMOTE_RECORD_SEQ_ID_SUFFIX != remote.charAt(remote.length() - 1))
-                  throw new JMSException("Invalid remote region snippet (no seq suffix) '" +
-                      remote + "' within '" + messageId);
-                final int regionIndx = remote.indexOf(REMOTE_RECORD_SEQ_ID_PREFIX);
-                if (-1 == regionIndx)
-                  throw new JMSException("Invalid remote region snippet (no region) '" + remote +
-                      "' within '" + messageId);
-                final String region = remote.substring(0, regionIndx);
-                final long seqId;
-
-
-                try {
-                    seqId = Long.parseLong(remote.substring(regionIndx + 1, remote.length() - 1));
-                } catch (NumberFormatException nfEx){
-                    JMSException jEx = new JMSException("Unable to parse remote seq id from '" +
-                        remote + "' within '" + messageId + "' .. " + nfEx);
-                    jEx.setLinkedException(nfEx);
-                    throw jEx;
-                }
-
-                PubSubProtocol.RegionSpecificSeqId.Builder rbuilder =
-                    PubSubProtocol.RegionSpecificSeqId.newBuilder();
-                rbuilder.setRegion(ByteString.copyFromUtf8(region));
-                rbuilder.setSeqId(seqId);
-                builder.addRemoteComponents(rbuilder);
-            }
-        }
-
-        return builder.build();
-    }
-
-    public static MessageImpl createCloneForDispatch(SessionImpl session, MessageImpl msg,
-                                                     String sourceTopicName, String subscriberId) throws JMSException {
-        MessageImpl retval = msg.createClone(session, sourceTopicName, subscriberId);
-        retval.reset();
-        return retval;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
deleted file mode 100644
index ba26c4c..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.ObjectMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * read/write serializable java object ...
- *
- */
-public class ObjectMessageImpl extends MessageImpl implements ObjectMessage {
-    private Serializable payload;
-    private boolean readMode;
-
-    public ObjectMessageImpl(SessionImpl session, Serializable payload) {
-        super(session);
-        this.payload = payload;
-        this.readMode = false;
-    }
-
-    public ObjectMessageImpl(SessionImpl session, ObjectMessageImpl message, String sourceTopicName,
-                             String subscriberId) throws JMSException {
-        super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-
-        this.payload = copySerializable(message.getObject());
-        this.readMode = message.readMode;
-    }
-
-    private Serializable copySerializable(Serializable object) throws JMSException {
-        try {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(object);
-            oos.flush();
-            oos.close();
-            baos.flush();
-            baos.close();
-
-            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
-            return (Serializable) ois.readObject();
-        } catch (IOException e){
-            JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception");
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        } catch (ClassNotFoundException e) {
-            JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception");
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
-    }
-
-    // To clone a message from a ObjectMessage which is NOT ObjectMessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    public ObjectMessageImpl(ObjectMessage message, SessionImpl session) throws JMSException {
-        super((Message) message, session);
-
-        if (message instanceof ObjectMessageImpl) {
-            throw new JMSException("Coding bug - should use this constructor ONLY for non ObjectMessageImpl messages");
-        }
-
-
-        this.payload = message.getObject();
-        this.readMode = false;
-    }
-
-    public ObjectMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
-                             String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
-        super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
-        try {
-            this.payload = hasBodyFromProperties() ?
-                (Serializable) MessageUtil.bytesToObject(message.getBody().toByteArray()) : null;
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.readMode = true;
-    }
-
-    @Override
-    protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
-        return MessageUtil.SupportedMessageTypes.OBJECT;
-    }
-
-    @Override
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        super.populateBuilderWithHeaders(builder);
-
-        // Now set body and type.
-        try {
-            if (! isBodyEmpty()) builder.setBody(ByteString.copyFrom(MessageUtil.objectToBytes(this.payload)));
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        return builder.build();
-    }
-
-    protected boolean isBodyEmpty(){
-        return null == this.payload;
-    }
-
-    @Override
-    public void setObject(Serializable payload) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        this.payload = payload;
-    }
-
-    @Override
-    public Serializable getObject() throws JMSException {
-        return payload;
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        super.clearBody();
-        // allow read and write.
-        this.payload = null;
-        this.readMode = false;
-    }
-
-    @Override
-    public void reset() throws JMSException {
-        if (this.readMode) return ;
-        this.readMode = true;
-    }
-
-    @Override
-    ObjectMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId)
-        throws JMSException {
-
-        return new ObjectMessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
deleted file mode 100644
index 2aa74a2..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.Mutable;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.StreamMessage;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.Map;
-
-/**
- * Though similar to BytesMessageImpl, the difference is that BytesMessage expects the user to know
- * the schema while
- * StreamMessage user expects type conversion, etc.
- *
- * In our case, the stream is not a true open stream to the server; it is buffered in memory.
- */
-public class StreamMessageImpl extends MessageImpl implements StreamMessage {
-    private ReadOnlyMessage readOnlyMessage;
-    private WriteOnlyMessage writeOnlyMessage;
-    private boolean readMode;
-
-    public StreamMessageImpl(SessionImpl session) throws JMSException {
-        super(session);
-        clearBody();
-    }
-
-    // To clone a message
-    public StreamMessageImpl(SessionImpl session, StreamMessageImpl message, String sourceTopicName,
-                             String subscriberId) throws JMSException {
-        super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-        try {
-            if (message.readMode){
-                this.readOnlyMessage = new ReadOnlyMessage(message.getPayloadData());
-                this.writeOnlyMessage = null;
-            }
-            else {
-                this.readOnlyMessage = null;
-                this.writeOnlyMessage = new WriteOnlyMessage(message.getPayloadData());
-            }
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        this.readMode = message.readMode;
-    }
-
-    // To clone a message from a StreamMessage which is NOT StreamMessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    public StreamMessageImpl(StreamMessage message, SessionImpl session) throws JMSException {
-        super((Message) message, session);
-
-        if (message instanceof StreamMessageImpl) {
-            throw new JMSException("Coding bug - should use this constructor ONLY for non StreamMessageImpl messages");
-        }
-
-        final byte[] data;
-        try {
-            WriteOnlyMessage wom = new WriteOnlyMessage();
-            try {
-                Object obj;
-                while (null != (obj = message.readObject())){
-                    wom.writeObject(obj);
-                }
-            } catch (EOFException eof){
-                // ignore ...
-            }
-            data = wom.getPayloadAsBytes(null);
-        } catch (IOException e) {
-            JMSException jEx = new JMSException("Unable to write to internal message .. " + e);
-            jEx.setLinkedException(e);
-            throw jEx;
-        }
-
-        this.writeOnlyMessage = new WriteOnlyMessage(data);
-
-        this.readOnlyMessage  = null;
-        this.readMode = false;
-    }
-
-    StreamMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
-                      String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
-        super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
-        final byte[] data = message.getBody().toByteArray();
-        try {
-            this.readOnlyMessage = new ReadOnlyMessage(data);
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        this.writeOnlyMessage = null;
-        this.readMode = true;
-    }
-
-    @Override
-    protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
-        return MessageUtil.SupportedMessageTypes.STREAM;
-    }
-
-    protected boolean isBodyEmpty(){
-        return false;
-    }
-
-    @Override
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        super.populateBuilderWithHeaders(builder);
-
-        // Now set body and type.
-        try {
-            byte[] data = getPayloadData();
-            builder.setBody(ByteString.copyFrom(data));
-        } catch (IOException e) {
-            JMSException ex = new JMSException("Unable to read message data .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-
-        return builder.build();
-    }
-
-
-    @Override
-    public boolean readBoolean() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readBoolean();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public byte readByte() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readByte();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public short readShort() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readShort();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public char readChar() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readChar();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readInt() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readInt();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public long readLong() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readLong();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public float readFloat() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readFloat();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public double readDouble() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readDouble();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public String readString() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readString();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public int readBytes(byte[] data) throws JMSException {
-        throw new UnsupportedOperationException("Please use readObject - this method is not supported");
-    }
-
-    @Override
-    public Object readObject() throws JMSException {
-        if (!readMode) throw new MessageNotReadableException("Message not readable");
-        try {
-            return readOnlyMessage.readObject();
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("ioEx ?");
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBoolean(boolean val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBoolean(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeByte(byte val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeByte(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeShort(short val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeShort(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeChar(char val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeChar(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeInt(int val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeInt(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeLong(long val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeLong(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeFloat(float val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeFloat(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeDouble(double val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeDouble(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeString(String val) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeString(val);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBytes(byte[] data) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBytes(data);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void writeBytes(byte[] data, int offset, int length) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeBytes(data, offset, length);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    // This method is ONLY supposed to be used for object form of primitive types !
-    @Override
-    public void writeObject(Object obj) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        try {
-            writeOnlyMessage.writeObject(obj);
-        } catch (IOException ioEx){
-            JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx);
-            eofEx.setLinkedException(ioEx);
-            throw eofEx;
-        }
-    }
-
-    @Override
-    public void reset() throws JMSException {
-        if (this.readMode) return ;
-        this.readMode = true;
-        try {
-            byte[] data = writeOnlyMessage.getPayloadAsBytes(null);
-            this.readOnlyMessage = new ReadOnlyMessage(data);
-        } catch (IOException e) {
-            JMSException ex = new JMSException("cant convert to read only message ... unexpected actually .. " + e);
-            ex.setLinkedException(e);
-            throw ex;
-        }
-        this.writeOnlyMessage = null;
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        super.clearBody();
-        this.writeOnlyMessage = new WriteOnlyMessage();
-        this.readOnlyMessage = null;
-        this.readMode = false;
-    }
-
-    private byte[] getPayloadData() throws IOException, IllegalStateException {
-        if (readMode) return readOnlyMessage.getDataCopy();
-
-        Mutable<byte[]> preCloseData = new Mutable<byte[]>(null);
-        byte[] data = writeOnlyMessage.getPayloadAsBytes(preCloseData);
-
-        writeOnlyMessage = new WriteOnlyMessage(preCloseData.getValue());
-        return data;
-    }
-
-
-    @Override
-    StreamMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId)
-        throws JMSException {
-        return new StreamMessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-
-    // Using java object's instead of primitives to avoid having to store schema separately.
-    private static class ReadOnlyMessage {
-
-        private final ObjectInputStream ois;
-        private final byte[] data;
-        private final Deque<Object> unreadObjects = new ArrayDeque<Object>(4);
-
-        public ReadOnlyMessage(byte[] data) throws IOException {
-            this.data = data;
-            this.ois = new ObjectInputStream(new ByteArrayInputStream(data));
-        }
-
-        public byte[] getDataCopy(){
-            return Arrays.copyOf(data, data.length);
-        }
-
-        private void unreadObject(Object obj) {
-            unreadObjects.push(obj);
-        }
-
-        private Object readNextObject() throws IOException, JMSException {
-            try {
-                if (! unreadObjects.isEmpty()) return unreadObjects.pop();
-
-                return ois.readObject();
-            } catch (ClassNotFoundException e) {
-                // unexpected !
-                javax.jms.IllegalStateException jEx =
-                    new javax.jms.IllegalStateException("Unexpected not to be able to resolve class");
-                jEx.setLinkedException(e);
-                throw jEx;
-            } catch (EOFException eof) {
-                throw new MessageEOFException("eof");
-            }
-        }
-
-        public boolean readBoolean() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Boolean value = MessageUtil.asBoolean(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public byte readByte() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Byte value = MessageUtil.asByte(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public short readShort() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Short value = MessageUtil.asShort(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public char readChar() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Character value = MessageUtil.asChar(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public int readInt() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Integer value = MessageUtil.asInteger(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public long readLong() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Long value = MessageUtil.asLong(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public float readFloat() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Float value = MessageUtil.asFloat(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public double readDouble() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                Double value = MessageUtil.asDouble(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public String readString() throws IOException, JMSException {
-            Object obj = readNextObject();
-            boolean failed = true;
-            try {
-                String value = MessageUtil.asString(obj);
-                failed = false;
-                return value;
-            } finally {
-                if (failed) unreadObject(obj);
-            }
-        }
-
-        public Object readObject() throws IOException, JMSException {
-            return readNextObject();
-        }
-    }
-
-    private static class WriteOnlyMessage {
-
-        private final ByteArrayOutputStream baos;
-        // private ObjectOutputStream oos;
-        private final ObjectOutputStream oos;
-
-        public WriteOnlyMessage() throws JMSException {
-            baos = new ByteArrayOutputStream();
-            try {
-                oos = new ObjectOutputStream(baos);
-            } catch (IOException e) {
-                IllegalStateException jEx =
-                    new IllegalStateException("Unexpected to not be able to create empty write only message");
-                jEx.setLinkedException(e);
-                throw jEx;
-            }
-        }
-
-        private WriteOnlyMessage(final byte[] data) throws IllegalStateException {
-            baos = new ByteArrayOutputStream();
-            try {
-                if (null != data) baos.write(data);
-                baos.flush();
-                oos = new ObjectOutputStream(baos){
-                    // Do not write the header if data is based on already materialized stream.
-                    protected void writeStreamHeader() throws IOException {
-                        if (null == data || 0 == data.length) super.writeStreamHeader();
-                    }
-                };
-            } catch (IOException e) {
-                IllegalStateException jEx =
-                    new IllegalStateException("Unexpected to not be able to create empty write only message");
-                jEx.setLinkedException(e);
-                throw jEx;
-            }
-        }
-
-        public byte[] getPayloadAsBytes(Mutable<byte[]> preCloseData) throws IOException {
-            oos.flush();
-            baos.flush();
-            if (null != preCloseData) preCloseData.setValue(baos.toByteArray());
-            oos.close();
-            baos.flush();
-            baos.close();
-            // oos = null;
-            return baos.toByteArray();
-        }
-
-        public void writeBoolean(boolean val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeByte(byte val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeShort(short val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeChar(char val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeInt(int val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeLong(long val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeFloat(float val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeDouble(double val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeString(String val) throws IOException {
-            oos.writeObject(val);
-        }
-
-        public void writeBytes(byte[] data) throws IOException {
-            oos.writeObject(data);
-        }
-
-        // copy and write as a single byte array.
-        public void writeBytes(byte[] data, int offset, int length) throws IOException {
-            byte[] arr = new byte[length];
-            System.arraycopy(data, offset, arr, 0, length);
-            writeBytes(arr);
-        }
-
-        public void writeObject(Object obj) throws JMSException, IOException {
-            // unrolling it
-            if (obj instanceof Boolean) {
-                writeBoolean((Boolean) obj);
-            }
-            else if (obj instanceof Byte) {
-                writeByte((Byte) obj);
-            }
-            else if (obj instanceof Short) {
-                writeShort((Short) obj);
-            }
-            else if (obj instanceof Character) {
-                writeChar((Character) obj);
-            }
-            else if (obj instanceof Integer) {
-                writeInt((Integer) obj);
-            }
-            else if (obj instanceof Long) {
-                writeLong((Long) obj);
-            }
-            else if (obj instanceof Float) {
-                writeFloat((Float) obj);
-            }
-            else if (obj instanceof Double) {
-                writeDouble((Double) obj);
-            }
-            else if (obj instanceof String) {
-                writeString((String) obj);
-            }
-            else if (obj instanceof byte[]) {
-                writeBytes((byte[]) obj);
-            }
-            else{
-                throw new JMSException("Unsupported type for obj : " + obj.getClass());
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
deleted file mode 100644
index dc3a3ca..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.message;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.TextMessage;
-import java.util.Map;
-
-/**
- * read/write text message ...
- */
-public class TextMessageImpl extends MessageImpl implements TextMessage {
-    private String payload;
-    private boolean readMode;
-
-    public TextMessageImpl(SessionImpl session) {
-        super(session);
-        this.readMode = false;
-    }
-
-    public TextMessageImpl(SessionImpl session, String payload) {
-        super(session);
-        this.payload = payload;
-        this.readMode = false;
-    }
-
-    public TextMessageImpl(SessionImpl session, TextMessageImpl message, String sourceTopicName,
-                           String subscriberId) throws JMSException {
-        super(session, (MessageImpl) message, sourceTopicName, subscriberId);
-
-        this.payload = message.getText();
-        this.readMode = message.readMode;
-    }
-
-
-    // To clone a message from a TextMessage which is NOT TextMessageImpl
-    // Changing order of parameter to NOT accidentally clash with the constructor above.
-    // This is midly confusing, but helps a lot in preventing accidental bugs !
-    public TextMessageImpl(TextMessage message, SessionImpl session) throws JMSException {
-        super((Message) message, session);
-
-        if (message instanceof TextMessageImpl) {
-            throw new JMSException("Coding bug - should use this constructor ONLY for non TextMessageImpl messages");
-        }
-
-        this.payload = message.getText();
-        this.readMode = false;
-    }
-
-    public TextMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties,
-                           String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException {
-        super(session, message, properties, sourceTopicName, subscriberId, ackRunnable);
-
-        this.payload = hasBodyFromProperties() ? message.getBody().toStringUtf8() : null;
-        this.readMode = true;
-    }
-
-    @Override
-    protected MessageUtil.SupportedMessageTypes getJmsMessageType() {
-        return MessageUtil.SupportedMessageTypes.TEXT;
-    }
-
-    @Override
-    public PubSubProtocol.Message generateHedwigMessage() throws JMSException {
-        PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder();
-        super.populateBuilderWithHeaders(builder);
-        if (! isBodyEmpty()) builder.setBody(ByteString.copyFromUtf8(this.payload));
-        return builder.build();
-    }
-
-    protected boolean isBodyEmpty(){
-        return null == this.payload;
-    }
-
-    @Override
-    public void setText(String payload) throws JMSException {
-        if (readMode) throw new MessageNotWriteableException("Message not writable");
-        this.payload = payload;
-    }
-
-    @Override
-    public String getText() throws JMSException {
-        return payload;
-    }
-
-    @Override
-    public void clearBody() throws JMSException {
-        super.clearBody();
-        this.payload = null;
-        this.readMode = false;
-    }
-
-    @Override
-    public void reset() throws JMSException {
-        if (this.readMode) return ;
-        this.readMode = true;
-    }
-
-    @Override
-    TextMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException {
-        return new TextMessageImpl(session, this, sourceTopicName, subscriberId);
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("TextMessageImpl");
-        sb.append("{payload='").append(payload).append('\'');
-        sb.append(", readMode=").append(readMode);
-        sb.append(", parent=").append(super.toString());
-        sb.append('}');
-        return sb.toString();
-    }
-}