You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/18 21:47:51 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5596
Repository: activemq
Updated Branches:
refs/heads/master e6597c460 -> 8858dc294
https://issues.apache.org/jira/browse/AMQ-5596
Remove the deprecated JMS Streams code.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8858dc29
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8858dc29
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8858dc29
Branch: refs/heads/master
Commit: 8858dc294cf3f992fc77053d9fc92f0422cf6605
Parents: e6597c4
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Feb 18 15:47:40 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Feb 18 15:47:40 2015 -0500
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQConnection.java | 150 +--------
.../apache/activemq/ActiveMQInputStream.java | 326 -------------------
.../apache/activemq/ActiveMQOutputStream.java | 222 -------------
.../org/apache/activemq/StreamConnection.java | 80 -----
.../activemq/ActiveMQInputStreamTest.java | 145 ---------
.../org/apache/activemq/LargeStreamletTest.java | 162 ---------
.../activemq/streams/JMSInputStreamTest.java | 281 ----------------
7 files changed, 1 insertion(+), 1365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index aef8513..51122ca 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -17,8 +17,6 @@
package org.apache.activemq;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
-import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
@@ -108,7 +105,7 @@ import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
+public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
@@ -173,9 +170,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
- // Stream are deprecated and will be removed in a later release.
- private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
- private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
@@ -183,7 +177,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
- private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
@@ -681,15 +674,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
- // Stream are deprecated and will be removed in a later release.
- for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
- ActiveMQInputStream c = i.next();
- c.dispose();
- }
- for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
- ActiveMQOutputStream c = i.next();
- c.dispose();
- }
this.activeTempDestinations.clear();
@@ -1268,13 +1252,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
/**
- * @return
- */
- private ProducerId createProducerId() {
- return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
- }
-
- /**
* Creates a <CODE>QueueSession</CODE> object.
*
* @param transacted indicates whether the session is transacted
@@ -1610,16 +1587,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
c.dispose();
}
- // Stream are deprecated and will be removed in a later release.
- for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
- ActiveMQInputStream c = i.next();
- c.dispose();
- }
- for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
- ActiveMQOutputStream c = i.next();
- c.dispose();
- }
-
if (isConnectionInfoSentToBroker) {
if (!transportFailed.get() && !closing.get()) {
syncSendPacket(info.createRemoveCommand());
@@ -2199,100 +2166,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.objectMessageSerializationDefered = objectMessageSerializationDefered;
}
- @Override
- @Deprecated
- public InputStream createInputStream(Destination dest) throws JMSException {
- return createInputStream(dest, null);
- }
-
- @Override
- @Deprecated
- public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
- return createInputStream(dest, messageSelector, false);
- }
-
- @Override
- @Deprecated
- public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
- return createInputStream(dest, messageSelector, noLocal, -1);
- }
-
- @Override
- @Deprecated
- public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
- return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
- }
-
- @Override
- @Deprecated
- public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
- return createInputStream(dest, null, false);
- }
-
- @Override
- @Deprecated
- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
- return createDurableInputStream(dest, name, messageSelector, false);
- }
-
- @Override
- @Deprecated
- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
- return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
- }
-
- @Override
- @Deprecated
- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
- return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
- }
-
- @Deprecated
- private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
- checkClosedOrFailed();
- ensureConnectionInfoSent();
- return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
- }
-
- /**
- * Creates a persistent output stream; individual messages will be written
- * to disk/database by the broker
- */
- @Override
- @Deprecated
- public OutputStream createOutputStream(Destination dest) throws JMSException {
- return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
- }
-
- /**
- * Creates a non persistent output stream; messages will not be written to
- * disk
- */
- @Deprecated
- public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
- return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
- }
-
- /**
- * Creates an output stream allowing full control over the delivery mode,
- * the priority and time to live of the messages and the properties added to
- * messages on the stream.
- *
- * @param streamProperties defines a map of key-value pairs where the keys
- * are strings and the values are primitive values (numbers
- * and strings) which are appended to the messages similarly
- * to using the
- * {@link javax.jms.Message#setObjectProperty(String, Object)}
- * method
- */
- @Override
- @Deprecated
- public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
- checkClosedOrFailed();
- ensureConnectionInfoSent();
- return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
- }
-
/**
* Unsubscribes a durable subscription that has been created by a client.
* <P>
@@ -2312,7 +2185,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* specified.
* @since 1.1
*/
- @Override
public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
@@ -2364,26 +2236,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
- @Deprecated
- public void addOutputStream(ActiveMQOutputStream stream) {
- outputStreams.add(stream);
- }
-
- @Deprecated
- public void removeOutputStream(ActiveMQOutputStream stream) {
- outputStreams.remove(stream);
- }
-
- @Deprecated
- public void addInputStream(ActiveMQInputStream stream) {
- inputStreams.add(stream);
- }
-
- @Deprecated
- public void removeInputStream(ActiveMQInputStream stream) {
- inputStreams.remove(stream);
- }
-
protected void onControlCommand(ControlCommand command) {
String text = command.getCommand();
if (text != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
deleted file mode 100644
index a895421..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.JMSExceptionSupport;
-
-/**
- *
- */
-@Deprecated
-public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
-
- private final ActiveMQConnection connection;
- private final ConsumerInfo info;
- // These are the messages waiting to be delivered to the client
- private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
-
- private int deliveredCounter;
- private MessageDispatch lastDelivered;
- private boolean eosReached;
- private byte buffer[];
- private int pos;
- private Map<String, Object> jmsProperties;
-
- private ProducerId producerId;
- private long nextSequenceId;
- private final long timeout;
- private boolean firstReceived;
-
- public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
- throws JMSException {
- this.connection = connection;
-
- if (dest == null) {
- throw new InvalidDestinationException("Don't understand null destinations");
- } else if (dest.isTemporary()) {
- String physicalName = dest.getPhysicalName();
-
- if (physicalName == null) {
- throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
- }
-
- String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
-
- if (physicalName.indexOf(connectionID) < 0) {
- throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
- }
-
- if (connection.isDeleted(dest)) {
- throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
- }
- }
-
- if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
- this.timeout = timeout;
-
- this.info = new ConsumerInfo(consumerId);
- this.info.setSubscriptionName(name);
-
- if (selector != null && selector.trim().length() != 0) {
- selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
- } else {
- selector = "JMSType='org.apache.activemq.Stream'";
- }
-
- SelectorParser.parse(selector);
- this.info.setSelector(selector);
-
- this.info.setPrefetchSize(prefetch);
- this.info.setNoLocal(noLocal);
- this.info.setBrowser(false);
- this.info.setDispatchAsync(false);
-
- // Allows the options on the destination to configure the consumerInfo
- if (dest.getOptions() != null) {
- Map<String, String> options = new HashMap<String, String>(dest.getOptions());
- IntrospectionSupport.setProperties(this.info, options, "consumer.");
- }
-
- this.info.setDestination(dest);
-
- this.connection.addInputStream(this);
- this.connection.addDispatcher(info.getConsumerId(), this);
- this.connection.syncSendPacket(info);
- unconsumedMessages.start();
- }
-
- @Override
- public void close() throws IOException {
- if (!unconsumedMessages.isClosed()) {
- try {
- if (lastDelivered != null) {
- MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
- connection.asyncSendPacket(ack);
- }
- dispose();
- this.connection.syncSendPacket(info.createRemoveCommand());
- } catch (JMSException e) {
- throw IOExceptionSupport.create(e);
- }
- }
- }
-
- public void dispose() {
- if (!unconsumedMessages.isClosed()) {
- unconsumedMessages.close();
- this.connection.removeDispatcher(info.getConsumerId());
- this.connection.removeInputStream(this);
- }
- }
-
- /**
- * Return the JMS Properties which where used to send the InputStream
- *
- * @return jmsProperties
- * @throws IOException
- */
- public Map<String, Object> getJMSProperties() throws IOException {
- if (jmsProperties == null) {
- fillBuffer();
- }
- return jmsProperties;
- }
-
- /**
- * This method allows the client to receive the Stream data as unaltered ActiveMQMessage
- * object which is how the split stream data is sent. Each message will contains one
- * chunk of the written bytes as well as a valid message group sequence id. The EOS
- * message will have a message group sequence id of -1.
- *
- * This method is useful for testing, but should never be mixed with calls to the
- * normal stream receive methods as it will break the normal stream processing flow
- * and can lead to loss of data.
- *
- * @return an ActiveMQMessage object that either contains byte data or an end of strem
- * marker.
- * @throws JMSException
- * @throws ReadTimeoutException
- */
- public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
- checkClosed();
- MessageDispatch md;
- try {
- if (firstReceived || timeout == -1) {
- md = unconsumedMessages.dequeue(-1);
- firstReceived = true;
- } else {
- md = unconsumedMessages.dequeue(timeout);
- if (md == null) throw new ReadTimeoutException();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw JMSExceptionSupport.create(e);
- }
-
- if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
- return null;
- }
-
- deliveredCounter++;
- if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
- MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
- connection.asyncSendPacket(ack);
- deliveredCounter = 0;
- lastDelivered = null;
- } else {
- lastDelivered = md;
- }
-
- return (ActiveMQMessage)md.getMessage();
- }
-
- /**
- * @throws IllegalStateException
- */
- protected void checkClosed() throws IllegalStateException {
- if (unconsumedMessages.isClosed()) {
- throw new IllegalStateException("The Consumer is closed");
- }
- }
-
- /**
- *
- * @see InputStream#read()
- * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
- */
- @Override
- public int read() throws IOException {
- fillBuffer();
- if (eosReached || buffer.length == 0) {
- return -1;
- }
-
- return buffer[pos++] & 0xff;
- }
-
- /**
- *
- * @see InputStream#read(byte[], int, int)
- * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
- */
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- fillBuffer();
- if (eosReached || buffer.length == 0) {
- return -1;
- }
-
- int max = Math.min(len, buffer.length - pos);
- System.arraycopy(buffer, pos, b, off, max);
-
- pos += max;
- return max;
- }
-
- private void fillBuffer() throws IOException {
- if (eosReached || (buffer != null && buffer.length > pos)) {
- return;
- }
- try {
- while (true) {
- ActiveMQMessage m = receive();
- if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
- // First message.
- long producerSequenceId = m.getMessageId().getProducerSequenceId();
- if (producerId == null) {
- // We have to start a stream at sequence id = 0
- if (producerSequenceId != 0) {
- continue;
- }
- nextSequenceId++;
- producerId = m.getMessageId().getProducerId();
- } else {
- // Verify it's the next message of the sequence.
- if (!m.getMessageId().getProducerId().equals(producerId)) {
- throw new IOException("Received an unexpected message: invalid producer: " + m);
- }
- if (producerSequenceId != nextSequenceId++) {
- throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
- }
- }
-
- // Read the buffer in.
- ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
- buffer = new byte[(int)bm.getBodyLength()];
- bm.readBytes(buffer);
- pos = 0;
- if (jmsProperties == null) {
- jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
- }
- } else {
- eosReached = true;
- if (jmsProperties == null) {
- // no properties found
- jmsProperties = Collections.emptyMap();
- }
- }
- return;
- }
- } catch (JMSException e) {
- eosReached = true;
- if (jmsProperties == null) {
- // no properties found
- jmsProperties = Collections.emptyMap();
- }
- throw IOExceptionSupport.create(e);
- }
- }
-
- @Override
- public void dispatch(MessageDispatch md) {
- unconsumedMessages.enqueue(md);
- }
-
- @Override
- public String toString() {
- return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
- }
-
- /**
- * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
- */
- public class ReadTimeoutException extends IOException {
- private static final long serialVersionUID = -3217758894326719909L;
-
- public ReadTimeoutException() {
- super();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
deleted file mode 100644
index c6e400d..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQOutputStream extends OutputStream implements Disposable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
-
- protected int count;
-
- final byte buffer[];
-
- private final ActiveMQConnection connection;
- private final Map<String, Object> properties;
- private final ProducerInfo info;
-
- private long messageSequence;
- private boolean closed;
- private final int deliveryMode;
- private final int priority;
- private final long timeToLive;
- private boolean alwaysSyncSend = false;
- private boolean addPropertiesOnFirstMsgOnly = false;
-
- /**
- * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
- */
- public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
-
- public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
- long timeToLive) throws JMSException {
- this.connection = connection;
- this.deliveryMode = deliveryMode;
- this.priority = priority;
- this.timeToLive = timeToLive;
- this.properties = properties == null ? null : new HashMap<String, Object>(properties);
-
- Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
- if (chunkSize == null) {
- chunkSize = 64 * 1024;
- } else {
- if (chunkSize < 1) {
- throw new IllegalArgumentException("Chunk size must be greater then 0");
- } else {
- chunkSize *= 1024;
- }
- }
-
- buffer = new byte[chunkSize];
-
- if (destination == null) {
- throw new InvalidDestinationException("Don't understand null destinations");
- }
-
- this.info = new ProducerInfo(producerId);
-
- // Allows the options on the destination to configure the stream
- if (destination.getOptions() != null) {
- Map<String, String> options = new HashMap<String, String>(destination.getOptions());
- IntrospectionSupport.setProperties(this, options, "producer.");
- IntrospectionSupport.setProperties(this.info, options, "producer.");
- if (options.size() > 0) {
- String msg = "There are " + options.size()
- + " producer options that couldn't be set on the producer."
- + " Check the options are spelled correctly."
- + " Unknown parameters=[" + options + "]."
- + " This producer cannot be started.";
- LOG.warn(msg);
- throw new ConfigurationException(msg);
- }
- }
-
- this.info.setDestination(destination);
-
- this.connection.addOutputStream(this);
- this.connection.asyncSendPacket(info);
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- flushBuffer();
- try {
- // Send an EOS style empty message to signal EOS.
- send(new ActiveMQMessage(), true);
- dispose();
- this.connection.asyncSendPacket(info.createRemoveCommand());
- } catch (JMSException e) {
- IOExceptionSupport.create(e);
- }
- }
- }
-
- @Override
- public void dispose() {
- if (!closed) {
- this.connection.removeOutputStream(this);
- closed = true;
- }
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- buffer[count++] = (byte) b;
- if (count == buffer.length) {
- flushBuffer();
- }
- }
-
- @Override
- public synchronized void write(byte b[], int off, int len) throws IOException {
- while (len > 0) {
- int max = Math.min(len, buffer.length - count);
- System.arraycopy(b, off, buffer, count, max);
-
- len -= max;
- count += max;
- off += max;
-
- if (count == buffer.length) {
- flushBuffer();
- }
- }
- }
-
- @Override
- public synchronized void flush() throws IOException {
- flushBuffer();
- }
-
- private void flushBuffer() throws IOException {
- if (count != 0) {
- try {
- ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
- msg.writeBytes(buffer, 0, count);
- send(msg, false);
- } catch (JMSException e) {
- throw IOExceptionSupport.create(e);
- }
- count = 0;
- }
- }
-
- /**
- * @param msg
- * @throws JMSException
- */
- private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
- if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
- for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
- String key = iter.next();
- Object value = properties.get(key);
- msg.setObjectProperty(key, value);
- }
- }
- msg.setType("org.apache.activemq.Stream");
- msg.setGroupID(info.getProducerId().toString());
- if (eosMessage) {
- msg.setGroupSequence(-1);
- } else {
- msg.setGroupSequence((int) messageSequence);
- }
- MessageId id = new MessageId(info.getProducerId(), messageSequence++);
- connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
- }
-
- @Override
- public String toString() {
- return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
- }
-
- public boolean isAlwaysSyncSend() {
- return alwaysSyncSend;
- }
-
- public void setAlwaysSyncSend(boolean alwaysSyncSend) {
- this.alwaysSyncSend = alwaysSyncSend;
- }
-
- public boolean isAddPropertiesOnFirstMsgOnly() {
- return addPropertiesOnFirstMsgOnly;
- }
-
- public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
- this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java b/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
deleted file mode 100644
index 2f75b4f..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
-/**
- * The StreamConnection interface allows you to send and receive data from a
- * Destination in using standard java InputStream and OutputStream objects. It's
- * best use case is to send and receive large amounts of data that would be to
- * large to hold in a single JMS message.
- *
- *
- */
-@Deprecated
-public interface StreamConnection extends Connection {
-
- InputStream createInputStream(Destination dest) throws JMSException;
-
- InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
-
- InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
-
- InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException;
-
- InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
-
- InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
-
- InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
-
- InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException;
-
- OutputStream createOutputStream(Destination dest) throws JMSException;
-
- OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
-
- /**
- * Unsubscribes a durable subscription that has been created by a client.
- * <P>
- * This method deletes the state being maintained on behalf of the
- * subscriber by its provider.
- * <P>
- * It is erroneous for a client to delete a durable subscription while there
- * is an active <CODE>MessageConsumer </CODE> or
- * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
- * message is part of a pending transaction or has not been acknowledged in
- * the session.
- *
- * @param name the name used to identify this subscription
- * @throws JMSException if the session fails to unsubscribe to the durable
- * subscription due to some internal error.
- * @throws InvalidDestinationException if an invalid subscription name is
- * specified.
- * @since 1.1
- */
- void unsubscribe(String name) throws JMSException;
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
deleted file mode 100644
index 77f422e..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQInputStreamTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
-
- private static final String BROKER_URL = "tcp://localhost:0";
- private static final String DESTINATION = "destination";
- private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
-
- private BrokerService broker;
- private String connectionUri;
-
- @Override
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setPersistent(false);
- broker.setDestinations(new ActiveMQDestination[] {
- ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),
- });
- broker.addConnector(BROKER_URL);
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @Override
- public void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- public void testInputStreamSetSyncSendOption() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
-
- OutputStream out = null;
- try {
- out = connection.createOutputStream(destination);
-
- assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend());
-
- LOG.debug("writing...");
- for (int i = 0; i < STREAM_LENGTH; ++i) {
- out.write(0);
- }
- LOG.debug("wrote " + STREAM_LENGTH + " bytes");
- } finally {
- if (out != null) {
- out.close();
- }
- }
-
- InputStream in = null;
- try {
- in = connection.createInputStream(destination);
- LOG.debug("reading...");
- int count = 0;
- while (-1 != in.read()) {
- ++count;
- }
- LOG.debug("read " + count + " bytes");
- } finally {
- if (in != null) {
- in.close();
- }
- }
-
- connection.close();
- }
-
- public void testInputStreamMatchesDefaultChuckSize() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(DESTINATION);
-
- OutputStream out = null;
- try {
- out = connection.createOutputStream(destination);
- LOG.debug("writing...");
- for (int i = 0; i < STREAM_LENGTH; ++i) {
- out.write(0);
- }
- LOG.debug("wrote " + STREAM_LENGTH + " bytes");
- } finally {
- if (out != null) {
- out.close();
- }
- }
-
- InputStream in = null;
- try {
- in = connection.createInputStream(destination);
- LOG.debug("reading...");
- int count = 0;
- while (-1 != in.read()) {
- ++count;
- }
- LOG.debug("read " + count + " bytes");
- } finally {
- if (in != null) {
- in.close();
- }
- }
-
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
deleted file mode 100644
index d624d36..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author rnewson
- */
-public final class LargeStreamletTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
- private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
- private static final int BUFFER_SIZE = 1 * 1024;
- private static final int MESSAGE_COUNT = 10 * 1024;
-
- protected Exception writerException;
- protected Exception readerException;
-
- private final AtomicInteger totalRead = new AtomicInteger();
- private final AtomicInteger totalWritten = new AtomicInteger();
- private final AtomicBoolean stopThreads = new AtomicBoolean(false);
-
- public void testStreamlets() throws Exception {
- final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
-
- final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
- connection.start();
- try {
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- final Destination destination = session.createQueue("wibble");
- final Thread readerThread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- totalRead.set(0);
- try {
- final InputStream inputStream = connection.createInputStream(destination);
- try {
- int read;
- final byte[] buf = new byte[BUFFER_SIZE];
- while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) {
- totalRead.addAndGet(read);
- }
- } finally {
- inputStream.close();
- }
- } catch (Exception e) {
- readerException = e;
- e.printStackTrace();
- } finally {
- LOG.info(totalRead + " total bytes read.");
- }
- }
- });
-
- final Thread writerThread = new Thread(new Runnable() {
- private final Random random = new Random();
-
- @Override
- public void run() {
- totalWritten.set(0);
- int count = MESSAGE_COUNT;
- try {
- final OutputStream outputStream = connection.createOutputStream(destination);
- try {
- final byte[] buf = new byte[BUFFER_SIZE];
- random.nextBytes(buf);
- while (count > 0 && !stopThreads.get()) {
- outputStream.write(buf);
- totalWritten.addAndGet(buf.length);
- count--;
- }
- } finally {
- outputStream.close();
- }
- } catch (Exception e) {
- writerException = e;
- e.printStackTrace();
- } finally {
- LOG.info(totalWritten + " total bytes written.");
- }
- }
- });
-
- readerThread.start();
- writerThread.start();
-
- // Wait till reader is has finished receiving all the messages
- // or he has stopped
- // receiving messages.
- Thread.sleep(1000);
- int lastRead = totalRead.get();
- while (readerThread.isAlive()) {
- readerThread.join(1000);
- // No progress?? then stop waiting..
- if (lastRead == totalRead.get()) {
- break;
- }
- lastRead = totalRead.get();
- }
-
- stopThreads.set(true);
-
- assertTrue("Should not have received a reader exception", readerException == null);
- assertTrue("Should not have received a writer exception", writerException == null);
-
- assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get());
-
- } finally {
- session.close();
- }
- } finally {
- connection.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
deleted file mode 100755
index f392662..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.streams;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQInputStream;
-import org.apache.activemq.ActiveMQOutputStream;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- * JMSInputStreamTest
- */
-@Deprecated
-public class JMSInputStreamTest extends JmsTestSupport {
-
- public Destination destination;
- protected DataOutputStream out;
- protected DataInputStream in;
- private ActiveMQConnection connection2;
-
- private ActiveMQInputStream amqIn;
- private ActiveMQOutputStream amqOut;
-
- public static Test suite() {
- return suite(JMSInputStreamTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void initCombos() {
- addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") });
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setAutoFail(true);
- super.setUp();
- }
-
- private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
- connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
- connections.add(connection2);
- if (props != null) {
- amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- } else {
- amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
- }
-
- out = new DataOutputStream(amqOut);
- if (timeout == -1) {
- amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
- } else {
- amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
- }
- in = new DataInputStream(amqIn);
- }
-
- /*
- * @see TestCase#tearDown()
- */
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- /**
- * Test for AMQ-3010
- */
- public void testInputStreamTimeout() throws Exception {
- long timeout = 500;
-
- setUpConnection(null, timeout);
- try {
- in.read();
- fail();
- } catch (ActiveMQInputStream.ReadTimeoutException e) {
- // timeout reached, everything ok
- }
- in.close();
- }
-
- // Test for AMQ-2988
- public void testStreamsWithProperties() throws Exception {
- String name1 = "PROPERTY_1";
- String name2 = "PROPERTY_2";
- String value1 = "VALUE_1";
- String value2 = "VALUE_2";
- Map<String, Object> jmsProperties = new HashMap<String, Object>();
- jmsProperties.put(name1, value1);
- jmsProperties.put(name2, value2);
- setUpConnection(jmsProperties, -1);
-
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- // check properties before we try to read the stream
- checkProperties(jmsProperties);
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
-
- // check again after read was done
- checkProperties(jmsProperties);
- }
-
- public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
- String name1 = "PROPERTY_1";
- String name2 = "PROPERTY_2";
- String value1 = "VALUE_1";
- String value2 = "VALUE_2";
- Map<String, Object> jmsProperties = new HashMap<String, Object>();
- jmsProperties.put(name1, value1);
- jmsProperties.put(name2, value2);
-
- ActiveMQDestination dest = (ActiveMQDestination) destination;
-
- if (dest.isQueue()) {
- destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
- } else {
- destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
- }
-
- setUpConnection(jmsProperties, -1);
-
- assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
-
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- // check properties before we try to read the stream
- checkProperties(jmsProperties);
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
-
- // check again after read was done
- checkProperties(jmsProperties);
- }
-
- // check if the received stream has the properties set
- // Test for AMQ-2988
- private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
- Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
-
- // we should at least have the same amount or more properties
- assertTrue(jmsProperties.size() <= receivedJmsProps.size());
-
- // check the properties to see if we have everything in there
- Iterator<String> propsIt = jmsProperties.keySet().iterator();
- while (propsIt.hasNext()) {
- String key = propsIt.next();
- assertTrue(receivedJmsProps.containsKey(key));
- assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
- }
- }
-
- public void testLarge() throws Exception {
- setUpConnection(null, -1);
-
- final int testData = 23;
- final int dataLength = 4096;
- final int count = 1024;
- byte[] data = new byte[dataLength];
- for (int i = 0; i < data.length; i++) {
- data[i] = testData;
- }
- final AtomicBoolean complete = new AtomicBoolean(false);
- Thread runner = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- for (int x = 0; x < count; x++) {
- byte[] b = new byte[2048];
- in.readFully(b);
- for (int i = 0; i < b.length; i++) {
- assertTrue(b[i] == testData);
- }
- }
- complete.set(true);
- synchronized (complete) {
- complete.notify();
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- });
- runner.start();
- for (int i = 0; i < count; i++) {
- out.write(data);
- }
- out.flush();
- synchronized (complete) {
- if (!complete.get()) {
- complete.wait(30000);
- }
- }
- assertTrue(complete.get());
- }
-
- public void testStreams() throws Exception {
- setUpConnection(null, -1);
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
- }
-}