You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC
svn commit: r780773 [2/31] - in /activemq/sandbox/activemq-flow:
activemq-client/ activemq-client/src/main/java/org/
activemq-client/src/main/java/org/apache/
activemq-client/src/main/java/org/apache/activemq/
activemq-client/src/main/java/org/apache/a...
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+
+public final class AdvisorySupport {
+
+ public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+ public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+ public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+ public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+ public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+ public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+ public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+ public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+ public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+ public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+ public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+ public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+ public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+ public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+ public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer.";
+ public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+ public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+ public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+ public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+ public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+ public static final String AGENT_TOPIC = "ActiveMQ.Agent";
+ public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
+ public static final String MSG_PROPERTY_ORIGIN_BROKER_URL="originBrokerURL";
+ public static final String MSG_PROPERTY_USAGE_NAME="usageName";
+ public static final String MSG_PROPERTY_CONSUMER_ID="consumerId";
+ public static final String MSG_PROPERTY_PRODUCER_ID="producerId";
+ public static final String MSG_PROPERTY_MESSAGE_ID="orignalMessageId";
+ public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
+ private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
+
+ private AdvisorySupport() {
+ }
+
+ public static ActiveMQTopic getConnectionAdvisoryTopic() {
+ return CONNECTION_ADVISORY_TOPIC;
+ }
+
+ public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ } else {
+ return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ }
+ }
+
+ public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ } else {
+ return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
+ }
+ }
+
+ public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return getExpiredQueueMessageAdvisoryTopic(destination);
+ }
+ return getExpiredTopicMessageAdvisoryTopic(destination);
+ }
+
+ public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
+ String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
+ String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ String name = SLOW_CONSUMER_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+ String name = FAST_PRODUCER_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+ return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+ }
+
+ public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
+ String name = FULL_TOPIC_PREFIX
+ + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
+
+ public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TOPIC_TYPE:
+ return TOPIC_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return TEMP_QUEUE_ADVISORY_TOPIC;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return TEMP_TOPIC_ADVISORY_TOPIC;
+ default:
+ throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
+ }
+ }
+
+ public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
+ || destination.equals(TOPIC_ADVISORY_TOPIC);
+ }
+ }
+
+ public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.equals(CONNECTION_ADVISORY_TOPIC);
+ }
+ }
+
+ public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isProducerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+ }
+ }
+
+ public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isFullAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
+ }
+ }
+
+ /**
+ * Returns the agent topic which is used to send commands to the broker
+ */
+ public static Destination getAgentDestination() {
+ return AGENT_TOPIC_DESTINATION;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html Mon Jun 1 18:37:41 2009
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available.
+
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/advisory/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+/**
+ * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
+ * out of band between producers, brokers and consumers.
+ *
+ * @version $Revision: $
+ */
+public class BlobTransferPolicy {
+ private String defaultUploadUrl = "http://localhost:8080/uploads/";
+ private String brokerUploadUrl;
+ private String uploadUrl;
+ private int bufferSize = 128 * 1024;
+ private BlobUploadStrategy uploadStrategy;
+
+ /**
+ * Returns a copy of this policy object
+ */
+ public BlobTransferPolicy copy() {
+ BlobTransferPolicy that = new BlobTransferPolicy();
+ that.defaultUploadUrl = this.defaultUploadUrl;
+ that.brokerUploadUrl = this.brokerUploadUrl;
+ that.uploadUrl = this.uploadUrl;
+ that.uploadStrategy = this.uploadStrategy;
+ return that;
+ }
+
+ public String getUploadUrl() {
+ if (uploadUrl == null) {
+ uploadUrl = getBrokerUploadUrl();
+ if (uploadUrl == null) {
+ uploadUrl = getDefaultUploadUrl();
+ }
+ }
+ return uploadUrl;
+ }
+
+ /**
+ * Sets the upload URL to use explicitly on the client which will
+ * overload the default or the broker's URL. This allows the client to decide
+ * where to upload files to irrespective of the brokers configuration.
+ */
+ public void setUploadUrl(String uploadUrl) {
+ this.uploadUrl = uploadUrl;
+ }
+
+ public String getBrokerUploadUrl() {
+ return brokerUploadUrl;
+ }
+
+ /**
+ * Called by the JMS client when a broker advertises its upload URL
+ */
+ public void setBrokerUploadUrl(String brokerUploadUrl) {
+ this.brokerUploadUrl = brokerUploadUrl;
+ }
+
+ public String getDefaultUploadUrl() {
+ return defaultUploadUrl;
+ }
+
+ /**
+ * Sets the default upload URL to use if the broker does not
+ * have a configured upload URL
+ */
+ public void setDefaultUploadUrl(String defaultUploadUrl) {
+ this.defaultUploadUrl = defaultUploadUrl;
+ }
+
+ public BlobUploadStrategy getUploadStrategy() {
+ if (uploadStrategy == null) {
+ uploadStrategy = createUploadStrategy();
+ }
+ return uploadStrategy;
+ }
+
+ /**
+ * Sets the upload strategy to use for uploading BLOBs to some URL
+ */
+ public void setUploadStrategy(BlobUploadStrategy uploadStrategy) {
+ this.uploadStrategy = uploadStrategy;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Sets the default buffer size used when uploading or downloading files
+ */
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ protected BlobUploadStrategy createUploadStrategy() {
+ return new DefaultBlobUploadStrategy(this);
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploadStrategy.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * Represents a strategy of uploading a file/stream to some remote
+ *
+ * @version $Revision: $
+ */
+public interface BlobUploadStrategy {
+
+ URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException;
+
+ URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException;
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/BlobUploader.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A helper class to represent a required upload of a BLOB to some remote URL
+ *
+ * @version $Revision: $
+ */
+public class BlobUploader {
+
+ private BlobTransferPolicy blobTransferPolicy;
+ private File file;
+ private InputStream in;
+
+ public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) {
+ this.blobTransferPolicy = blobTransferPolicy;
+ this.in = in;
+ }
+
+ public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) {
+ this.blobTransferPolicy = blobTransferPolicy;
+ this.file = file;
+ }
+
+ public URL upload(ActiveMQBlobMessage message) throws JMSException, IOException {
+ if (file != null) {
+ return getStrategy().uploadFile(message, file);
+ } else {
+ return getStrategy().uploadStream(message, in);
+ }
+ }
+
+ public BlobTransferPolicy getBlobTransferPolicy() {
+ return blobTransferPolicy;
+ }
+
+ public BlobUploadStrategy getStrategy() {
+ return getBlobTransferPolicy().getUploadStrategy();
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/DefaultBlobUploadStrategy.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A default implementation of {@link BlobUploadStrategy} which uses the URL
+ * class to upload files or streams to a remote URL
+ */
+public class DefaultBlobUploadStrategy implements BlobUploadStrategy {
+ private BlobTransferPolicy transferPolicy;
+
+ public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) {
+ this.transferPolicy = transferPolicy;
+ }
+
+ public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
+ return uploadStream(message, new FileInputStream(file));
+ }
+
+ public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
+ URL url = createUploadURL(message);
+
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("PUT");
+ connection.setDoOutput(true);
+
+ // use chunked mode or otherwise URLConnection loads everything into
+ // memory
+ // (chunked mode not supported before JRE 1.5)
+ connection.setChunkedStreamingMode(transferPolicy.getBufferSize());
+
+ OutputStream os = connection.getOutputStream();
+
+ byte[] buf = new byte[transferPolicy.getBufferSize()];
+ for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
+ os.write(buf, 0, c);
+ os.flush();
+ }
+ os.close();
+ fis.close();
+
+ if (!isSuccessfulCode(connection.getResponseCode())) {
+ throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "
+ + connection.getResponseMessage());
+ }
+
+ return url;
+ }
+
+ public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
+ URL url = createUploadURL(message);
+
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("DELETE");
+ connection.connect();
+ connection.disconnect();
+
+ if (!isSuccessfulCode(connection.getResponseCode())) {
+ throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " "
+ + connection.getResponseMessage());
+ }
+ }
+
+ private boolean isSuccessfulCode(int responseCode) {
+ return responseCode >= 200 && responseCode < 300; // 2xx => successful
+ }
+
+ protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException {
+ return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString());
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/blob/package.html Mon Jun 1 18:37:41 2009
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Helper classes for dealing with out-of-band BLOB objects
+
+</body>
+</html>
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.usage.MemoryUsage;
+
+
+/**
+ * @version $Revision: 1.12 $
+ */
+public interface Destination {
+
+ MemoryUsage getMemoryUsage();
+
+ int getMinimumMessageSize();
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/Destination.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Keeps track of a message that is flowing through the Broker. This
+ * object may hold a hard reference to the message or only hold the
+ * id of the message if the message has been persisted on in a MessageStore.
+ *
+ * @version $Revision: 1.15 $
+ */
+public interface MessageReference {
+
+ MessageId getMessageId();
+ Message getMessageHardRef();
+ Message getMessage() throws IOException;
+ boolean isPersistent();
+
+ Destination getRegionDestination();
+
+ int getRedeliveryCounter();
+ void incrementRedeliveryCounter();
+
+ int getReferenceCount();
+
+ int incrementReferenceCount();
+ int decrementReferenceCount();
+ ConsumerId getTargetConsumerId();
+ int getSize();
+ long getExpiration();
+ String getGroupID();
+ int getGroupSequence();
+
+ /**
+ * Returns true if this message is expired
+ */
+ boolean isExpired();
+
+ /**
+ * Returns true if this message is dropped.
+ */
+ boolean isDropped();
+
+ /**
+ * @return true if the message is an advisory
+ */
+ boolean isAdvisory();
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/region/MessageReference.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * An implementation of {@link BlobMessage} for out of band BLOB transfer
+ *
+ * @version $Revision: $
+ * @openwire:marshaller code="29"
+ */
+public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage {
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BLOB_MESSAGE;
+
+ public static final String BINARY_MIME_TYPE = "application/octet-stream";
+
+ private String remoteBlobUrl;
+ private String mimeType;
+ private String name;
+ private boolean deletedByBroker;
+
+ private transient BlobUploader blobUploader;
+ private transient URL url;
+
+ public Message copy() {
+ ActiveMQBlobMessage copy = new ActiveMQBlobMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQBlobMessage copy) {
+ super.copy(copy);
+ copy.setRemoteBlobUrl(getRemoteBlobUrl());
+ copy.setMimeType(getMimeType());
+ copy.setDeletedByBroker(isDeletedByBroker());
+ copy.setBlobUploader(getBlobUploader());
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=3 cache=false
+ */
+ public String getRemoteBlobUrl() {
+ return remoteBlobUrl;
+ }
+
+ public void setRemoteBlobUrl(String remoteBlobUrl) {
+ this.remoteBlobUrl = remoteBlobUrl;
+ url = null;
+ }
+
+ /**
+ * The MIME type of the BLOB which can be used to apply different content
+ * types to messages.
+ *
+ * @openwire:property version=3 cache=true
+ */
+ public String getMimeType() {
+ if (mimeType == null) {
+ return BINARY_MIME_TYPE;
+ }
+ return mimeType;
+ }
+
+ public void setMimeType(String mimeType) {
+ this.mimeType = mimeType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * The name of the attachment which can be useful information if
+ * transmitting files over ActiveMQ
+ *
+ * @openwire:property version=3 cache=false
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @openwire:property version=3 cache=false
+ */
+ public boolean isDeletedByBroker() {
+ return deletedByBroker;
+ }
+
+ public void setDeletedByBroker(boolean deletedByBroker) {
+ this.deletedByBroker = deletedByBroker;
+ }
+
+ public String getJMSXMimeType() {
+ return getMimeType();
+ }
+
+ public InputStream getInputStream() throws IOException, JMSException {
+ URL value = getURL();
+ if (value == null) {
+ return null;
+ }
+ return value.openStream();
+ }
+
+ public URL getURL() throws JMSException {
+ if (url == null && remoteBlobUrl != null) {
+ try {
+ url = new URL(remoteBlobUrl);
+ } catch (MalformedURLException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ return url;
+ }
+
+ public void setURL(URL url) {
+ this.url = url;
+ remoteBlobUrl = url != null ? url.toExternalForm() : null;
+ }
+
+ public BlobUploader getBlobUploader() {
+ return blobUploader;
+ }
+
+ public void setBlobUploader(BlobUploader blobUploader) {
+ this.blobUploader = blobUploader;
+ }
+
+ public void onSend() throws JMSException {
+ super.onSend();
+
+ // lets ensure we upload the BLOB first out of band before we send the
+ // message
+ if (blobUploader != null) {
+ try {
+ URL value = blobUploader.upload(this);
+ setURL(value);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * A <CODE>BytesMessage</CODE> object is used to send a message containing a
+ * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
+ * interface and adds a bytes message body. The receiver of the message supplies
+ * the interpretation of the bytes.
+ * <P>
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>.
+ * <P>
+ * This message type is for client encoding of existing message formats. If
+ * possible, one of the other self-defining message types should be used
+ * instead.
+ * <P>
+ * Although the JMS API allows the use of message properties with byte messages,
+ * they are typically not used, since the inclusion of properties may affect the
+ * format.
+ * <P>
+ * The primitive types can be written explicitly using methods for each type.
+ * They may also be written generically as objects. For instance, a call to
+ * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time.
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is
+ * called, the body of the message is in write-only mode. After the first call
+ * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client.
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message is in write-only mode.
+ * <P>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown.
+ * <P>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown.
+ *
+ * @openwire:marshaller code=24
+ * @see javax.jms.Session#createBytesMessage()
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
+
+ protected transient DataOutputStream dataOut;
+ protected transient ByteArrayOutputStream bytesOut;
+ protected transient DataInputStream dataIn;
+ protected transient int length;
+
+ public Message copy() {
+ ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQBytesMessage copy) {
+ storeContent();
+ super.copy(copy);
+ copy.dataOut = null;
+ copy.bytesOut = null;
+ copy.dataIn = null;
+ }
+
+ public void onSend() throws JMSException {
+ super.onSend();
+ storeContent();
+ }
+
+ private void storeContent() {
+ try {
+ if (dataOut != null) {
+ dataOut.close();
+ ByteSequence bs = bytesOut.toByteSequence();
+ if (compressed) {
+ int pos = bs.offset;
+ ByteSequenceData.writeIntBig(bs, length);
+ bs.offset = pos;
+ }
+ setContent(bs);
+ bytesOut = null;
+ dataOut = null;
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
+ // RuntimeException
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/bytes-message";
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries.
+ * <P>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body
+ * due to some internal error.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.dataOut = null;
+ this.dataIn = null;
+ this.bytesOut = null;
+ }
+
+ /**
+ * Gets the number of bytes of the message body when the message is in
+ * read-only mode. The value returned can be used to allocate a byte array.
+ * The value returned is the entire length of the message body, regardless
+ * of where the pointer for reading the message is currently located.
+ *
+ * @return number of bytes in the message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ * @since 1.1
+ */
+
+ public long getBodyLength() throws JMSException {
+ initializeReading();
+ return length;
+ }
+
+ /**
+ * Reads a <code>boolean</code> from the bytes message stream.
+ *
+ * @return the <code>boolean</code> value read
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public boolean readBoolean() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readBoolean();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a signed 8-bit value from the bytes message stream.
+ *
+ * @return the next byte from the bytes message stream as a signed 8-bit
+ * <code>byte</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public byte readByte() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readByte();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads an unsigned 8-bit number from the bytes message stream.
+ *
+ * @return the next byte from the bytes message stream, interpreted as an
+ * unsigned 8-bit number
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public int readUnsignedByte() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readUnsignedByte();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a signed 16-bit number from the bytes message stream.
+ *
+ * @return the next two bytes from the bytes message stream, interpreted as
+ * a signed 16-bit number
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public short readShort() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readShort();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads an unsigned 16-bit number from the bytes message stream.
+ *
+ * @return the next two bytes from the bytes message stream, interpreted as
+ * an unsigned 16-bit integer
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public int readUnsignedShort() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readUnsignedShort();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a Unicode character value from the bytes message stream.
+ *
+ * @return the next two bytes from the bytes message stream as a Unicode
+ * character
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public char readChar() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readChar();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a signed 32-bit integer from the bytes message stream.
+ *
+ * @return the next four bytes from the bytes message stream, interpreted as
+ * an <code>int</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public int readInt() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readInt();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a signed 64-bit integer from the bytes message stream.
+ *
+ * @return the next eight bytes from the bytes message stream, interpreted
+ * as a <code>long</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public long readLong() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readLong();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>float</code> from the bytes message stream.
+ *
+ * @return the next four bytes from the bytes message stream, interpreted as
+ * a <code>float</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public float readFloat() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readFloat();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>double</code> from the bytes message stream.
+ *
+ * @return the next eight bytes from the bytes message stream, interpreted
+ * as a <code>double</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public double readDouble() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readDouble();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a string that has been encoded using a modified UTF-8 format from
+ * the bytes message stream.
+ * <P>
+ * For more information on the UTF-8 format, see "File System Safe UCS
+ * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
+ * X/Open Company Ltd., Document Number: P316. This information also appears
+ * in ISO/IEC 10646, Annex P.
+ *
+ * @return a Unicode string from the bytes message stream
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of bytes stream has been
+ * reached.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public String readUTF() throws JMSException {
+ initializeReading();
+ try {
+ return this.dataIn.readUTF();
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a byte array from the bytes message stream.
+ * <P>
+ * If the length of array <code>value</code> is less than the number of
+ * bytes remaining to be read from the stream, the array should be filled. A
+ * subsequent call reads the next increment, and so on.
+ * <P>
+ * If the number of bytes remaining in the stream is less than the length of
+ * array <code>value</code>, the bytes should be read into the array. The
+ * return value of the total number of bytes read will be less than the
+ * length of the array, indicating that there are no more bytes left to be
+ * read from the stream. The next read of the stream returns -1.
+ *
+ * @param value the buffer into which the data is read
+ * @return the total number of bytes read into the buffer, or -1 if there is
+ * no more data because the end of the stream has been reached
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public int readBytes(byte[] value) throws JMSException {
+ return readBytes(value, value.length);
+ }
+
+ /**
+ * Reads a portion of the bytes message stream.
+ * <P>
+ * If the length of array <code>value</code> is less than the number of
+ * bytes remaining to be read from the stream, the array should be filled. A
+ * subsequent call reads the next increment, and so on.
+ * <P>
+ * If the number of bytes remaining in the stream is less than the length of
+ * array <code>value</code>, the bytes should be read into the array. The
+ * return value of the total number of bytes read will be less than the
+ * length of the array, indicating that there are no more bytes left to be
+ * read from the stream. The next read of the stream returns -1. <p/> If
+ * <code>length</code> is negative, or <code>length</code> is greater
+ * than the length of the array <code>value</code>, then an
+ * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
+ * from the stream for this exception case.
+ *
+ * @param value the buffer into which the data is read
+ * @param length the number of bytes to read; must be less than or equal to
+ * <code>value.length</code>
+ * @return the total number of bytes read into the buffer, or -1 if there is
+ * no more data because the end of the stream has been reached
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+ public int readBytes(byte[] value, int length) throws JMSException {
+ initializeReading();
+ try {
+ int n = 0;
+ while (n < length) {
+ int count = this.dataIn.read(value, n, length - n);
+ if (count < 0) {
+ break;
+ }
+ n += count;
+ }
+ if (n == 0 && length > 0) {
+ n = -1;
+ }
+ return n;
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
+ * value. The value <code>true</code> is written as the value
+ * <code>(byte)1</code>; the value <code>false</code> is written as the
+ * value <code>(byte)0</code>.
+ *
+ * @param value the <code>boolean</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeBoolean(boolean value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeBoolean(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>byte</code> to the bytes message stream as a 1-byte
+ * value.
+ *
+ * @param value the <code>byte</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeByte(byte value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeByte(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>short</code> to the bytes message stream as two bytes,
+ * high byte first.
+ *
+ * @param value the <code>short</code> to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeShort(short value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeShort(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>char</code> to the bytes message stream as a 2-byte
+ * value, high byte first.
+ *
+ * @param value the <code>char</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeChar(char value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeChar(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an <code>int</code> to the bytes message stream as four bytes,
+ * high byte first.
+ *
+ * @param value the <code>int</code> to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeInt(int value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeInt(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>long</code> to the bytes message stream as eight bytes,
+ * high byte first.
+ *
+ * @param value the <code>long</code> to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeLong(long value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeLong(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Converts the <code>float</code> argument to an <code>int</code> using
+ * the <code>floatToIntBits</code> method in class <code>Float</code>,
+ * and then writes that <code>int</code> value to the bytes message stream
+ * as a 4-byte quantity, high byte first.
+ *
+ * @param value the <code>float</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeFloat(float value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeFloat(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Converts the <code>double</code> argument to a <code>long</code>
+ * using the <code>doubleToLongBits</code> method in class
+ * <code>Double</code>, and then writes that <code>long</code> value to
+ * the bytes message stream as an 8-byte quantity, high byte first.
+ *
+ * @param value the <code>double</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeDouble(double value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeDouble(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a string to the bytes message stream using UTF-8 encoding in a
+ * machine-independent manner.
+ * <P>
+ * For more information on the UTF-8 format, see "File System Safe UCS
+ * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
+ * X/Open Company Ltd., Document Number: P316. This information also appears
+ * in ISO/IEC 10646, Annex P.
+ *
+ * @param value the <code>String</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeUTF(String value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.writeUTF(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a byte array to the bytes message stream.
+ *
+ * @param value the byte array to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeBytes(byte[] value) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.write(value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a portion of a byte array to the bytes message stream.
+ *
+ * @param value the byte array value to be written
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ initializeWriting();
+ try {
+ this.dataOut.write(value, offset, length);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an object to the bytes message stream.
+ * <P>
+ * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
+ * <code>Long</code> ...), <code>String</code> objects, and byte
+ * arrays.
+ *
+ * @param value the object in the Java programming language ("Java object")
+ * to be written; it must not be null
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageFormatException if the object is of an invalid type.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ * @throws java.lang.NullPointerException if the parameter
+ * <code>value</code> is null.
+ */
+ public void writeObject(Object value) throws JMSException {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ initializeWriting();
+ if (value instanceof Boolean) {
+ writeBoolean(((Boolean)value).booleanValue());
+ } else if (value instanceof Character) {
+ writeChar(((Character)value).charValue());
+ } else if (value instanceof Byte) {
+ writeByte(((Byte)value).byteValue());
+ } else if (value instanceof Short) {
+ writeShort(((Short)value).shortValue());
+ } else if (value instanceof Integer) {
+ writeInt(((Integer)value).intValue());
+ } else if (value instanceof Long) {
+ writeLong(((Long)value).longValue());
+ } else if (value instanceof Float) {
+ writeFloat(((Float)value).floatValue());
+ } else if (value instanceof Double) {
+ writeDouble(((Double)value).doubleValue());
+ } else if (value instanceof String) {
+ writeUTF(value.toString());
+ } else if (value instanceof byte[]) {
+ writeBytes((byte[])value);
+ } else {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
+ }
+ }
+
+ /**
+ * Puts the message body in read-only mode and repositions the stream of
+ * bytes to the beginning.
+ *
+ * @throws JMSException if an internal error occurs
+ */
+ public void reset() throws JMSException {
+ storeContent();
+ this.bytesOut = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ setReadOnlyBody(true);
+ }
+
+ private void initializeWriting() throws JMSException {
+ checkReadOnlyBody();
+ if (this.dataOut == null) {
+ this.bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ IConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ // keep track of the real length of the content if
+ // we are compressed.
+ try {
+ os.write(new byte[4]);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ length = 0;
+ compressed = true;
+ Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+ os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
+ public void write(byte[] arg0) throws IOException {
+ length += arg0.length;
+ out.write(arg0);
+ }
+
+ public void write(byte[] arg0, int arg1, int arg2) throws IOException {
+ length += arg2;
+ out.write(arg0, arg1, arg2);
+ }
+
+ public void write(int arg0) throws IOException {
+ length++;
+ out.write(arg0);
+ }
+ };
+ }
+ this.dataOut = new DataOutputStream(os);
+ }
+ }
+
+ protected void checkWriteOnlyBody() throws MessageNotReadableException {
+ if (!readOnlyBody) {
+ throw new MessageNotReadableException("Message body is write-only");
+ }
+ }
+
+ private void initializeReading() throws JMSException {
+ checkWriteOnlyBody();
+ if (dataIn == null) {
+ ByteSequence data = getContent();
+ if (data == null) {
+ data = new ByteSequence(new byte[] {}, 0, 0);
+ }
+ InputStream is = new ByteArrayInputStream(data);
+ if (isCompressed()) {
+ // keep track of the real length of the content if
+ // we are compressed.
+ try {
+ DataInputStream dis = new DataInputStream(is);
+ length = dis.readInt();
+ dis.close();
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ is = new InflaterInputStream(is);
+ } else {
+ length = data.getLength();
+ }
+ dataIn = new DataInputStream(is);
+ }
+ }
+
+ public void setObjectProperty(String name, Object value) throws JMSException {
+ initializeWriting();
+ super.setObjectProperty(name, value);
+ }
+
+ public String toString() {
+ return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
------------------------------------------------------------------------------
svn:executable = *