You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC

svn commit: r780773 [2/31] - in /activemq/sandbox/activemq-flow: activemq-client/ activemq-client/src/main/java/org/ activemq-client/src/main/java/org/apache/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/a...

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+
+public final class AdvisorySupport {
+
+    public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+    public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+    public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+    public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+    public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+    public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+    public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+    public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+    public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer.";
+    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
+    public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_URL="originBrokerURL";
+    public static final String MSG_PROPERTY_USAGE_NAME="usageName";
+    public static final String MSG_PROPERTY_CONSUMER_ID="consumerId";
+    public static final String MSG_PROPERTY_PRODUCER_ID="producerId";
+    public static final String MSG_PROPERTY_MESSAGE_ID="orignalMessageId";
+    public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
+    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
+
+    private AdvisorySupport() {        
+    }
+    
+    public static ActiveMQTopic getConnectionAdvisoryTopic() {
+        return CONNECTION_ADVISORY_TOPIC;
+    }
+
+    public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        } else {
+            return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        }
+    }
+
+    public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        } else {
+            return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+        }
+    }
+
+    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return getExpiredQueueMessageAdvisoryTopic(destination);
+        }
+        return getExpiredTopicMessageAdvisoryTopic(destination);
+    }
+
+    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
+        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
+        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
+        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
+        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = SLOW_CONSUMER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FAST_PRODUCER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+    }
+    
+    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FULL_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+
+    public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
+        switch (destination.getDestinationType()) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            return QUEUE_ADVISORY_TOPIC;
+        case ActiveMQDestination.TOPIC_TYPE:
+            return TOPIC_ADVISORY_TOPIC;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return TEMP_QUEUE_ADVISORY_TOPIC;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return TEMP_TOPIC_ADVISORY_TOPIC;
+        default:
+            throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+        }
+    }
+
+    public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
+                   || destination.equals(TOPIC_ADVISORY_TOPIC);
+        }
+    }
+
+    public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
+        }
+    }
+
+    public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.equals(CONNECTION_ADVISORY_TOPIC);
+        }
+    }
+
+    public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+        }
+    }
+
+    public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFullAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
+        }
+    }
+
+    /**
+     * Returns the agent topic which is used to send commands to the broker
+     */
+    public static Destination getAgentDestination() {
+        return AGENT_TOPIC_DESTINATION;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html Mon Jun  1 18:37:41 2009
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available.
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+/**
+ * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
+ * out of band between producers, brokers and consumers.
+ *
+ * @version $Revision: $
+ */
+public class BlobTransferPolicy {
+    private String defaultUploadUrl = "http://localhost:8080/uploads/";
+    private String brokerUploadUrl;
+    private String uploadUrl;
+    private int bufferSize = 128 * 1024;
+    private BlobUploadStrategy uploadStrategy;
+
+    /**
+     * Returns a copy of this policy object
+     */
+    public BlobTransferPolicy copy() {
+        BlobTransferPolicy that = new BlobTransferPolicy();
+        that.defaultUploadUrl = this.defaultUploadUrl;
+        that.brokerUploadUrl = this.brokerUploadUrl;
+        that.uploadUrl = this.uploadUrl;
+        that.uploadStrategy = this.uploadStrategy;
+        return that;
+    }
+
+    public String getUploadUrl() {
+        if (uploadUrl == null) {
+            uploadUrl = getBrokerUploadUrl();
+            if (uploadUrl == null) {
+                uploadUrl = getDefaultUploadUrl();
+            }
+        }
+        return uploadUrl;
+    }
+
+    /**
+     * Sets the upload URL to use explicitly on the client which will
+     * overload the default or the broker's URL. This allows the client to decide
+     * where to upload files to irrespective of the brokers configuration.
+     */
+    public void setUploadUrl(String uploadUrl) {
+        this.uploadUrl = uploadUrl;
+    }
+
+    public String getBrokerUploadUrl() {
+        return brokerUploadUrl;
+    }
+
+    /**
+     * Called by the JMS client when a broker advertises its upload URL
+     */
+    public void setBrokerUploadUrl(String brokerUploadUrl) {
+        this.brokerUploadUrl = brokerUploadUrl;
+    }
+
+    public String getDefaultUploadUrl() {
+        return defaultUploadUrl;
+    }
+
+    /**
+     * Sets the default upload URL to use if the broker does not
+     * have a configured upload URL
+     */
+    public void setDefaultUploadUrl(String defaultUploadUrl) {
+        this.defaultUploadUrl = defaultUploadUrl;
+    }
+
+    public BlobUploadStrategy getUploadStrategy() {
+        if (uploadStrategy == null) {
+            uploadStrategy = createUploadStrategy();
+        }
+        return uploadStrategy;
+    }
+
+    /**
+     * Sets the upload strategy to use for uploading BLOBs to some URL
+     */
+    public void setUploadStrategy(BlobUploadStrategy uploadStrategy) {
+        this.uploadStrategy = uploadStrategy;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    /**
+     * Sets the default buffer size used when uploading or downloading files
+     */
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    protected BlobUploadStrategy createUploadStrategy() {
+        return new DefaultBlobUploadStrategy(this);
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * Represents a strategy of uploading a file/stream to some remote
+ *
+ * @version $Revision: $
+ */
+public interface BlobUploadStrategy {
+
+    URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException;
+
+    URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException;
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A helper class to represent a required upload of a BLOB to some remote URL
+ * 
+ * @version $Revision: $
+ */
+public class BlobUploader {
+
+    private BlobTransferPolicy blobTransferPolicy;
+    private File file;
+    private InputStream in;
+
+    public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
+        this.blobTransferPolicy = blobTransferPolicy;
+        this.in = in;
+    }
+
+    public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) {
+        this.blobTransferPolicy = blobTransferPolicy;
+        this.file = file;
+    }
+
+    public URL upload(ActiveMQBlobMessage message) throws JMSException, IOException {
+        if (file != null) {
+            return getStrategy().uploadFile(message, file);
+        } else {
+            return getStrategy().uploadStream(message, in);
+        }
+    }
+
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        return blobTransferPolicy;
+    }
+
+    public BlobUploadStrategy getStrategy() {
+        return getBlobTransferPolicy().getUploadStrategy();
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A default implementation of {@link BlobUploadStrategy} which uses the URL
+ * class to upload files or streams to a remote URL
+ */
+public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
+    private BlobTransferPolicy transferPolicy;
+
+    public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
+        this.transferPolicy = transferPolicy;
+    }
+
+    public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
+        return uploadStream(message, new FileInputStream(file));
+    }
+
+    public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
+        URL url = createUploadURL(message);
+
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("PUT");
+        connection.setDoOutput(true);
+
+        // use chunked mode or otherwise URLConnection loads everything into
+        // memory
+        // (chunked mode not supported before JRE 1.5)
+        connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
+
+        OutputStream os = connection.getOutputStream();
+
+        byte[] buf = new byte[transferPolicy.getBufferSize()];
+        for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
+            os.write(buf, 0, c);
+            os.flush();
+        }
+        os.close();
+        fis.close();
+
+        if (!isSuccessfulCode(connection.getResponseCode())) {
+            throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "
+                                  + connection.getResponseMessage());
+        }
+
+        return url;
+    }
+
+    public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
+        URL url = createUploadURL(message);
+
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("DELETE");
+        connection.connect();
+        connection.disconnect();
+
+        if (!isSuccessfulCode(connection.getResponseCode())) {
+            throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
+                                  + connection.getResponseMessage());
+        }
+    }
+
+    private boolean isSuccessfulCode(int responseCode) {
+        return responseCode >= 200 && responseCode < 300; // 2xx => successful
+    }
+
+    protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
+        return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html Mon Jun  1 18:37:41 2009
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Helper classes for dealing with out-of-band BLOB objects
+
+</body>
+</html>

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.usage.MemoryUsage;
+
+
+/**
+ * @version $Revision: 1.12 $
+ */
+public interface Destination {
+
+    MemoryUsage getMemoryUsage();
+
+    int getMinimumMessageSize();
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Keeps track of a message that is flowing through the Broker.  This 
+ * object may hold a hard reference to the message or only hold the
+ * id of the message if the message has been persisted on in a MessageStore.
+ * 
+ * @version $Revision: 1.15 $
+ */
+public interface MessageReference {
+    
+    MessageId getMessageId();
+    Message getMessageHardRef();
+    Message getMessage() throws IOException;
+    boolean isPersistent();
+    
+    Destination getRegionDestination();
+    
+    int getRedeliveryCounter();
+    void incrementRedeliveryCounter();
+    
+    int getReferenceCount();
+    
+    int incrementReferenceCount();
+    int decrementReferenceCount();
+    ConsumerId getTargetConsumerId();
+    int getSize();
+    long getExpiration();
+    String getGroupID();
+    int getGroupSequence();
+    
+    /**
+     * Returns true if this message is expired
+     */
+    boolean isExpired();
+
+    /**
+     * Returns true if this message is dropped.
+     */
+    boolean isDropped();
+    
+    /**
+     * @return true if the message is an advisory
+     */
+    boolean isAdvisory();
+    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * An implementation of {@link BlobMessage} for out of band BLOB transfer
+ * 
+ * @version $Revision: $
+ * @openwire:marshaller code="29"
+ */
+public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage {
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BLOB_MESSAGE;
+
+    public static final String BINARY_MIME_TYPE = "application/octet-stream";
+
+    private String remoteBlobUrl;
+    private String mimeType;
+    private String name;
+    private boolean deletedByBroker;
+
+    private transient BlobUploader blobUploader;
+    private transient URL url;
+
+    public Message copy() {
+        ActiveMQBlobMessage copy = new ActiveMQBlobMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQBlobMessage copy) {
+        super.copy(copy);
+        copy.setRemoteBlobUrl(getRemoteBlobUrl());
+        copy.setMimeType(getMimeType());
+        copy.setDeletedByBroker(isDeletedByBroker());
+        copy.setBlobUploader(getBlobUploader());
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     */
+    public String getRemoteBlobUrl() {
+        return remoteBlobUrl;
+    }
+
+    public void setRemoteBlobUrl(String remoteBlobUrl) {
+        this.remoteBlobUrl = remoteBlobUrl;
+        url = null;
+    }
+
+    /**
+     * The MIME type of the BLOB which can be used to apply different content
+     * types to messages.
+     * 
+     * @openwire:property version=3 cache=true
+     */
+    public String getMimeType() {
+        if (mimeType == null) {
+            return BINARY_MIME_TYPE;
+        }
+        return mimeType;
+    }
+
+    public void setMimeType(String mimeType) {
+        this.mimeType = mimeType;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * The name of the attachment which can be useful information if
+     * transmitting files over ActiveMQ
+     * 
+     * @openwire:property version=3 cache=false
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     */
+    public boolean isDeletedByBroker() {
+        return deletedByBroker;
+    }
+
+    public void setDeletedByBroker(boolean deletedByBroker) {
+        this.deletedByBroker = deletedByBroker;
+    }
+
+    public String getJMSXMimeType() {
+        return getMimeType();
+    }
+
+    public InputStream getInputStream() throws IOException, JMSException {
+        URL value = getURL();
+        if (value == null) {
+            return null;
+        }
+        return value.openStream();
+    }
+
+    public URL getURL() throws JMSException {
+        if (url == null && remoteBlobUrl != null) {
+            try {
+                url = new URL(remoteBlobUrl);
+            } catch (MalformedURLException e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+        return url;
+    }
+
+    public void setURL(URL url) {
+        this.url = url;
+        remoteBlobUrl = url != null ? url.toExternalForm() : null;
+    }
+
+    public BlobUploader getBlobUploader() {
+        return blobUploader;
+    }
+
+    public void setBlobUploader(BlobUploader blobUploader) {
+        this.blobUploader = blobUploader;
+    }
+
+    public void onSend() throws JMSException {
+        super.onSend();
+
+        // lets ensure we upload the BLOB first out of band before we send the
+        // message
+        if (blobUploader != null) {
+            try {
+                URL value = blobUploader.upload(this);
+                setURL(value);
+            } catch (IOException e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * A <CODE>BytesMessage</CODE> object is used to send a message containing a
+ * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
+ * interface and adds a bytes message body. The receiver of the message supplies
+ * the interpretation of the bytes.
+ * <P>
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>.
+ * <P>
+ * This message type is for client encoding of existing message formats. If
+ * possible, one of the other self-defining message types should be used
+ * instead.
+ * <P>
+ * Although the JMS API allows the use of message properties with byte messages,
+ * they are typically not used, since the inclusion of properties may affect the
+ * format.
+ * <P>
+ * The primitive types can be written explicitly using methods for each type.
+ * They may also be written generically as objects. For instance, a call to
+ * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time.
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is
+ * called, the body of the message is in write-only mode. After the first call
+ * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client.
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message is in write-only mode.
+ * <P>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown.
+ * <P>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown.
+ * 
+ * @openwire:marshaller code=24
+ * @see javax.jms.Session#createBytesMessage()
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
+
+    protected transient DataOutputStream dataOut;
+    protected transient ByteArrayOutputStream bytesOut;
+    protected transient DataInputStream dataIn;
+    protected transient int length;
+
+    public Message copy() {
+        ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQBytesMessage copy) {
+        storeContent();
+        super.copy(copy);
+        copy.dataOut = null;
+        copy.bytesOut = null;
+        copy.dataIn = null;
+    }
+
+    public void onSend() throws JMSException {
+        super.onSend();
+        storeContent();
+    }
+
+    private void storeContent() {
+        try {
+            if (dataOut != null) {
+                dataOut.close();
+                ByteSequence bs = bytesOut.toByteSequence();
+                if (compressed) {
+                    int pos = bs.offset;
+                    ByteSequenceData.writeIntBig(bs, length);
+                    bs.offset = pos;
+                }
+                setContent(bs);
+                bytesOut = null;
+                dataOut = null;
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
+                                                                // RuntimeException
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/bytes-message";
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its
+     * header values or property entries.
+     * <P>
+     * If this message body was read-only, calling this method leaves the
+     * message body in the same state as an empty body in a newly created
+     * message.
+     * 
+     * @throws JMSException if the JMS provider fails to clear the message body
+     *                 due to some internal error.
+     */
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.dataOut = null;
+        this.dataIn = null;
+        this.bytesOut = null;
+    }
+
+    /**
+     * Gets the number of bytes of the message body when the message is in
+     * read-only mode. The value returned can be used to allocate a byte array.
+     * The value returned is the entire length of the message body, regardless
+     * of where the pointer for reading the message is currently located.
+     * 
+     * @return number of bytes in the message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     * @since 1.1
+     */
+
+    public long getBodyLength() throws JMSException {
+        initializeReading();
+        return length;
+    }
+
+    /**
+     * Reads a <code>boolean</code> from the bytes message stream.
+     * 
+     * @return the <code>boolean</code> value read
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public boolean readBoolean() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readBoolean();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a signed 8-bit value from the bytes message stream.
+     * 
+     * @return the next byte from the bytes message stream as a signed 8-bit
+     *         <code>byte</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public byte readByte() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readByte();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads an unsigned 8-bit number from the bytes message stream.
+     * 
+     * @return the next byte from the bytes message stream, interpreted as an
+     *         unsigned 8-bit number
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public int readUnsignedByte() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readUnsignedByte();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a signed 16-bit number from the bytes message stream.
+     * 
+     * @return the next two bytes from the bytes message stream, interpreted as
+     *         a signed 16-bit number
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public short readShort() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readShort();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads an unsigned 16-bit number from the bytes message stream.
+     * 
+     * @return the next two bytes from the bytes message stream, interpreted as
+     *         an unsigned 16-bit integer
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public int readUnsignedShort() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readUnsignedShort();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a Unicode character value from the bytes message stream.
+     * 
+     * @return the next two bytes from the bytes message stream as a Unicode
+     *         character
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public char readChar() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readChar();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a signed 32-bit integer from the bytes message stream.
+     * 
+     * @return the next four bytes from the bytes message stream, interpreted as
+     *         an <code>int</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public int readInt() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readInt();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a signed 64-bit integer from the bytes message stream.
+     * 
+     * @return the next eight bytes from the bytes message stream, interpreted
+     *         as a <code>long</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public long readLong() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readLong();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>float</code> from the bytes message stream.
+     * 
+     * @return the next four bytes from the bytes message stream, interpreted as
+     *         a <code>float</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public float readFloat() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readFloat();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>double</code> from the bytes message stream.
+     * 
+     * @return the next eight bytes from the bytes message stream, interpreted
+     *         as a <code>double</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public double readDouble() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readDouble();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a string that has been encoded using a modified UTF-8 format from
+     * the bytes message stream.
+     * <P>
+     * For more information on the UTF-8 format, see "File System Safe UCS
+     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
+     * X/Open Company Ltd., Document Number: P316. This information also appears
+     * in ISO/IEC 10646, Annex P.
+     * 
+     * @return a Unicode string from the bytes message stream
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of bytes stream has been
+     *                 reached.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public String readUTF() throws JMSException {
+        initializeReading();
+        try {
+            return this.dataIn.readUTF();
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a byte array from the bytes message stream.
+     * <P>
+     * If the length of array <code>value</code> is less than the number of
+     * bytes remaining to be read from the stream, the array should be filled. A
+     * subsequent call reads the next increment, and so on.
+     * <P>
+     * If the number of bytes remaining in the stream is less than the length of
+     * array <code>value</code>, the bytes should be read into the array. The
+     * return value of the total number of bytes read will be less than the
+     * length of the array, indicating that there are no more bytes left to be
+     * read from the stream. The next read of the stream returns -1.
+     * 
+     * @param value the buffer into which the data is read
+     * @return the total number of bytes read into the buffer, or -1 if there is
+     *         no more data because the end of the stream has been reached
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public int readBytes(byte[] value) throws JMSException {
+        return readBytes(value, value.length);
+    }
+
+    /**
+     * Reads a portion of the bytes message stream.
+     * <P>
+     * If the length of array <code>value</code> is less than the number of
+     * bytes remaining to be read from the stream, the array should be filled. A
+     * subsequent call reads the next increment, and so on.
+     * <P>
+     * If the number of bytes remaining in the stream is less than the length of
+     * array <code>value</code>, the bytes should be read into the array. The
+     * return value of the total number of bytes read will be less than the
+     * length of the array, indicating that there are no more bytes left to be
+     * read from the stream. The next read of the stream returns -1. <p/> If
+     * <code>length</code> is negative, or <code>length</code> is greater
+     * than the length of the array <code>value</code>, then an
+     * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
+     * from the stream for this exception case.
+     * 
+     * @param value the buffer into which the data is read
+     * @param length the number of bytes to read; must be less than or equal to
+     *                <code>value.length</code>
+     * @return the total number of bytes read into the buffer, or -1 if there is
+     *         no more data because the end of the stream has been reached
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+    public int readBytes(byte[] value, int length) throws JMSException {
+        initializeReading();
+        try {
+            int n = 0;
+            while (n < length) {
+                int count = this.dataIn.read(value, n, length - n);
+                if (count < 0) {
+                    break;
+                }
+                n += count;
+            }
+            if (n == 0 && length > 0) {
+                n = -1;
+            }
+            return n;
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
+     * value. The value <code>true</code> is written as the value
+     * <code>(byte)1</code>; the value <code>false</code> is written as the
+     * value <code>(byte)0</code>.
+     * 
+     * @param value the <code>boolean</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeBoolean(boolean value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeBoolean(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>byte</code> to the bytes message stream as a 1-byte
+     * value.
+     * 
+     * @param value the <code>byte</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeByte(byte value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeByte(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>short</code> to the bytes message stream as two bytes,
+     * high byte first.
+     * 
+     * @param value the <code>short</code> to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeShort(short value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeShort(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>char</code> to the bytes message stream as a 2-byte
+     * value, high byte first.
+     * 
+     * @param value the <code>char</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeChar(char value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeChar(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an <code>int</code> to the bytes message stream as four bytes,
+     * high byte first.
+     * 
+     * @param value the <code>int</code> to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeInt(int value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeInt(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>long</code> to the bytes message stream as eight bytes,
+     * high byte first.
+     * 
+     * @param value the <code>long</code> to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeLong(long value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeLong(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Converts the <code>float</code> argument to an <code>int</code> using
+     * the <code>floatToIntBits</code> method in class <code>Float</code>,
+     * and then writes that <code>int</code> value to the bytes message stream
+     * as a 4-byte quantity, high byte first.
+     * 
+     * @param value the <code>float</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeFloat(float value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeFloat(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Converts the <code>double</code> argument to a <code>long</code>
+     * using the <code>doubleToLongBits</code> method in class
+     * <code>Double</code>, and then writes that <code>long</code> value to
+     * the bytes message stream as an 8-byte quantity, high byte first.
+     * 
+     * @param value the <code>double</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeDouble(double value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeDouble(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a string to the bytes message stream using UTF-8 encoding in a
+     * machine-independent manner.
+     * <P>
+     * For more information on the UTF-8 format, see "File System Safe UCS
+     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
+     * X/Open Company Ltd., Document Number: P316. This information also appears
+     * in ISO/IEC 10646, Annex P.
+     * 
+     * @param value the <code>String</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeUTF(String value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.writeUTF(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a byte array to the bytes message stream.
+     * 
+     * @param value the byte array to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeBytes(byte[] value) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.write(value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a portion of a byte array to the bytes message stream.
+     * 
+     * @param value the byte array value to be written
+     * @param offset the initial offset within the byte array
+     * @param length the number of bytes to use
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+        initializeWriting();
+        try {
+            this.dataOut.write(value, offset, length);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an object to the bytes message stream.
+     * <P>
+     * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
+     * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
+     * arrays.
+     * 
+     * @param value the object in the Java programming language ("Java object")
+     *                to be written; it must not be null
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageFormatException if the object is of an invalid type.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     * @throws java.lang.NullPointerException if the parameter
+     *                 <code>value</code> is null.
+     */
+    public void writeObject(Object value) throws JMSException {
+        if (value == null) {
+            throw new NullPointerException();
+        }
+        initializeWriting();
+        if (value instanceof Boolean) {
+            writeBoolean(((Boolean)value).booleanValue());
+        } else if (value instanceof Character) {
+            writeChar(((Character)value).charValue());
+        } else if (value instanceof Byte) {
+            writeByte(((Byte)value).byteValue());
+        } else if (value instanceof Short) {
+            writeShort(((Short)value).shortValue());
+        } else if (value instanceof Integer) {
+            writeInt(((Integer)value).intValue());
+        } else if (value instanceof Long) {
+            writeLong(((Long)value).longValue());
+        } else if (value instanceof Float) {
+            writeFloat(((Float)value).floatValue());
+        } else if (value instanceof Double) {
+            writeDouble(((Double)value).doubleValue());
+        } else if (value instanceof String) {
+            writeUTF(value.toString());
+        } else if (value instanceof byte[]) {
+            writeBytes((byte[])value);
+        } else {
+            throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
+        }
+    }
+
+    /**
+     * Puts the message body in read-only mode and repositions the stream of
+     * bytes to the beginning.
+     * 
+     * @throws JMSException if an internal error occurs
+     */
+    public void reset() throws JMSException {
+        storeContent();
+        this.bytesOut = null;
+        this.dataIn = null;
+        this.dataOut = null;
+        setReadOnlyBody(true);
+    }
+
+    private void initializeWriting() throws JMSException {
+        checkReadOnlyBody();
+        if (this.dataOut == null) {
+            this.bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            IConnection connection = getConnection();
+            if (connection != null && connection.isUseCompression()) {
+                // keep track of the real length of the content if
+                // we are compressed.
+                try {
+                    os.write(new byte[4]);
+                } catch (IOException e) {
+                    throw JMSExceptionSupport.create(e);
+                }
+                length = 0;
+                compressed = true;
+                Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+                os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
+                    public void write(byte[] arg0) throws IOException {
+                        length += arg0.length;
+                        out.write(arg0);
+                    }
+
+                    public void write(byte[] arg0, int arg1, int arg2) throws IOException {
+                        length += arg2;
+                        out.write(arg0, arg1, arg2);
+                    }
+
+                    public void write(int arg0) throws IOException {
+                        length++;
+                        out.write(arg0);
+                    }
+                };
+            }
+            this.dataOut = new DataOutputStream(os);
+        }
+    }
+
+    protected void checkWriteOnlyBody() throws MessageNotReadableException {
+        if (!readOnlyBody) {
+            throw new MessageNotReadableException("Message body is write-only");
+        }
+    }
+
+    private void initializeReading() throws JMSException {
+        checkWriteOnlyBody();
+        if (dataIn == null) {
+            ByteSequence data = getContent();
+            if (data == null) {
+                data = new ByteSequence(new byte[] {}, 0, 0);
+            }
+            InputStream is = new ByteArrayInputStream(data);
+            if (isCompressed()) {
+                // keep track of the real length of the content if
+                // we are compressed.
+                try {
+                    DataInputStream dis = new DataInputStream(is);
+                    length = dis.readInt();
+                    dis.close();
+                } catch (IOException e) {
+                    throw JMSExceptionSupport.create(e);
+                }
+                is = new InflaterInputStream(is);
+            } else {
+                length = data.getLength();
+            }
+            dataIn = new DataInputStream(is);
+        }
+    }
+
+    public void setObjectProperty(String name, Object value) throws JMSException {
+        initializeWriting();
+        super.setObjectProperty(name, value);
+    }
+
+    public String toString() {
+        return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
------------------------------------------------------------------------------
    svn:executable = *