You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/18 21:47:51 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5596

Repository: activemq
Updated Branches:
  refs/heads/master e6597c460 -> 8858dc294


https://issues.apache.org/jira/browse/AMQ-5596

Remove the deprecated JMS Streams code.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8858dc29
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8858dc29
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8858dc29

Branch: refs/heads/master
Commit: 8858dc294cf3f992fc77053d9fc92f0422cf6605
Parents: e6597c4
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Feb 18 15:47:40 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Feb 18 15:47:40 2015 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java | 150 +--------
 .../apache/activemq/ActiveMQInputStream.java    | 326 -------------------
 .../apache/activemq/ActiveMQOutputStream.java   | 222 -------------
 .../org/apache/activemq/StreamConnection.java   |  80 -----
 .../activemq/ActiveMQInputStreamTest.java       | 145 ---------
 .../org/apache/activemq/LargeStreamletTest.java | 162 ---------
 .../activemq/streams/JMSInputStreamTest.java    | 281 ----------------
 7 files changed, 1 insertion(+), 1365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index aef8513..51122ca 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -17,8 +17,6 @@
 package org.apache.activemq;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
@@ -108,7 +105,7 @@ import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
+public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
 
     public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
     public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
@@ -173,9 +170,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
     private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
     private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
-    // Stream are deprecated and will be removed in a later release.
-    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
-    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
 
     // Maps ConsumerIds to ActiveMQConsumer objects
     private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
@@ -183,7 +177,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
     private final SessionId connectionSessionId;
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
-    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
 
@@ -681,15 +674,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
                         ActiveMQConnectionConsumer c = i.next();
                         c.dispose();
                     }
-                    // Stream are deprecated and will be removed in a later release.
-                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
-                        ActiveMQInputStream c = i.next();
-                        c.dispose();
-                    }
-                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
-                        ActiveMQOutputStream c = i.next();
-                        c.dispose();
-                    }
 
                     this.activeTempDestinations.clear();
 
@@ -1268,13 +1252,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     }
 
     /**
-     * @return
-     */
-    private ProducerId createProducerId() {
-        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
-    }
-
-    /**
      * Creates a <CODE>QueueSession</CODE> object.
      *
      * @param transacted indicates whether the session is transacted
@@ -1610,16 +1587,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
             c.dispose();
         }
 
-        // Stream are deprecated and will be removed in a later release.
-        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
-            ActiveMQInputStream c = i.next();
-            c.dispose();
-        }
-        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
-            ActiveMQOutputStream c = i.next();
-            c.dispose();
-        }
-
         if (isConnectionInfoSentToBroker) {
             if (!transportFailed.get() && !closing.get()) {
                 syncSendPacket(info.createRemoveCommand());
@@ -2199,100 +2166,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
         this.objectMessageSerializationDefered = objectMessageSerializationDefered;
     }
 
-    @Override
-    @Deprecated
-    public InputStream createInputStream(Destination dest) throws JMSException {
-        return createInputStream(dest, null);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
-        return createInputStream(dest, messageSelector, false);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
-        return createInputStream(dest, messageSelector, noLocal, -1);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
-        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
-        return createInputStream(dest, null, false);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
-        return createDurableInputStream(dest, name, messageSelector, false);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
-        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
-    }
-
-    @Override
-    @Deprecated
-    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
-        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
-    }
-
-    @Deprecated
-    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
-        checkClosedOrFailed();
-        ensureConnectionInfoSent();
-        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
-    }
-
-    /**
-     * Creates a persistent output stream; individual messages will be written
-     * to disk/database by the broker
-     */
-    @Override
-    @Deprecated
-    public OutputStream createOutputStream(Destination dest) throws JMSException {
-        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
-    }
-
-    /**
-     * Creates a non persistent output stream; messages will not be written to
-     * disk
-     */
-    @Deprecated
-    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
-        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
-    }
-
-    /**
-     * Creates an output stream allowing full control over the delivery mode,
-     * the priority and time to live of the messages and the properties added to
-     * messages on the stream.
-     *
-     * @param streamProperties defines a map of key-value pairs where the keys
-     *                are strings and the values are primitive values (numbers
-     *                and strings) which are appended to the messages similarly
-     *                to using the
-     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
-     *                method
-     */
-    @Override
-    @Deprecated
-    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        checkClosedOrFailed();
-        ensureConnectionInfoSent();
-        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
-    }
-
     /**
      * Unsubscribes a durable subscription that has been created by a client.
      * <P>
@@ -2312,7 +2185,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
      *                 specified.
      * @since 1.1
      */
-    @Override
     public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
         checkClosedOrFailed();
         RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
@@ -2364,26 +2236,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
         }
     }
 
-    @Deprecated
-    public void addOutputStream(ActiveMQOutputStream stream) {
-        outputStreams.add(stream);
-    }
-
-    @Deprecated
-    public void removeOutputStream(ActiveMQOutputStream stream) {
-        outputStreams.remove(stream);
-    }
-
-    @Deprecated
-    public void addInputStream(ActiveMQInputStream stream) {
-        inputStreams.add(stream);
-    }
-
-    @Deprecated
-    public void removeInputStream(ActiveMQInputStream stream) {
-        inputStreams.remove(stream);
-    }
-
     protected void onControlCommand(ControlCommand command) {
         String text = command.getCommand();
         if (text != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
deleted file mode 100644
index a895421..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.JMSExceptionSupport;
-
-/**
- *
- */
-@Deprecated
-public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
-
-    private final ActiveMQConnection connection;
-    private final ConsumerInfo info;
-    // These are the messages waiting to be delivered to the client
-    private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
-
-    private int deliveredCounter;
-    private MessageDispatch lastDelivered;
-    private boolean eosReached;
-    private byte buffer[];
-    private int pos;
-    private Map<String, Object> jmsProperties;
-
-    private ProducerId producerId;
-    private long nextSequenceId;
-    private final long timeout;
-    private boolean firstReceived;
-
-    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
-        throws JMSException {
-        this.connection = connection;
-
-        if (dest == null) {
-            throw new InvalidDestinationException("Don't understand null destinations");
-        } else if (dest.isTemporary()) {
-            String physicalName = dest.getPhysicalName();
-
-            if (physicalName == null) {
-                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
-            }
-
-            String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
-
-            if (physicalName.indexOf(connectionID) < 0) {
-                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
-            }
-
-            if (connection.isDeleted(dest)) {
-                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
-            }
-        }
-
-        if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
-        this.timeout = timeout;
-
-        this.info = new ConsumerInfo(consumerId);
-        this.info.setSubscriptionName(name);
-
-        if (selector != null && selector.trim().length() != 0) {
-            selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
-        } else {
-            selector = "JMSType='org.apache.activemq.Stream'";
-        }
-
-        SelectorParser.parse(selector);
-        this.info.setSelector(selector);
-
-        this.info.setPrefetchSize(prefetch);
-        this.info.setNoLocal(noLocal);
-        this.info.setBrowser(false);
-        this.info.setDispatchAsync(false);
-
-        // Allows the options on the destination to configure the consumerInfo
-        if (dest.getOptions() != null) {
-            Map<String, String> options = new HashMap<String, String>(dest.getOptions());
-            IntrospectionSupport.setProperties(this.info, options, "consumer.");
-        }
-
-        this.info.setDestination(dest);
-
-        this.connection.addInputStream(this);
-        this.connection.addDispatcher(info.getConsumerId(), this);
-        this.connection.syncSendPacket(info);
-        unconsumedMessages.start();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!unconsumedMessages.isClosed()) {
-            try {
-                if (lastDelivered != null) {
-                    MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
-                    connection.asyncSendPacket(ack);
-                }
-                dispose();
-                this.connection.syncSendPacket(info.createRemoveCommand());
-            } catch (JMSException e) {
-                throw IOExceptionSupport.create(e);
-            }
-        }
-    }
-
-    public void dispose() {
-        if (!unconsumedMessages.isClosed()) {
-            unconsumedMessages.close();
-            this.connection.removeDispatcher(info.getConsumerId());
-            this.connection.removeInputStream(this);
-        }
-    }
-
-    /**
-     * Return the JMS Properties which where used to send the InputStream
-     *
-     * @return jmsProperties
-     * @throws IOException
-     */
-    public Map<String, Object> getJMSProperties() throws IOException {
-        if (jmsProperties == null) {
-            fillBuffer();
-        }
-        return jmsProperties;
-    }
-
-    /**
-     * This method allows the client to receive the Stream data as unaltered ActiveMQMessage
-     * object which is how the split stream data is sent.  Each message will contains one
-     * chunk of the written bytes as well as a valid message group sequence id.  The EOS
-     * message will have a message group sequence id of -1.
-     *
-     * This method is useful for testing, but should never be mixed with calls to the
-     * normal stream receive methods as it will break the normal stream processing flow
-     * and can lead to loss of data.
-     *
-     * @return an ActiveMQMessage object that either contains byte data or an end of strem
-     *         marker.
-     * @throws JMSException
-     * @throws ReadTimeoutException
-     */
-    public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
-        checkClosed();
-        MessageDispatch md;
-        try {
-            if (firstReceived || timeout == -1) {
-                md = unconsumedMessages.dequeue(-1);
-                firstReceived = true;
-            } else {
-                md = unconsumedMessages.dequeue(timeout);
-                if (md == null) throw new ReadTimeoutException();
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw JMSExceptionSupport.create(e);
-        }
-
-        if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
-            return null;
-        }
-
-        deliveredCounter++;
-        if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
-            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
-            connection.asyncSendPacket(ack);
-            deliveredCounter = 0;
-            lastDelivered = null;
-        } else {
-            lastDelivered = md;
-        }
-
-        return (ActiveMQMessage)md.getMessage();
-    }
-
-    /**
-     * @throws IllegalStateException
-     */
-    protected void checkClosed() throws IllegalStateException {
-        if (unconsumedMessages.isClosed()) {
-            throw new IllegalStateException("The Consumer is closed");
-        }
-    }
-
-    /**
-     *
-     * @see InputStream#read()
-     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
-     */
-    @Override
-    public int read() throws IOException {
-        fillBuffer();
-        if (eosReached || buffer.length == 0) {
-            return -1;
-        }
-
-        return buffer[pos++] & 0xff;
-    }
-
-    /**
-     *
-     * @see InputStream#read(byte[], int, int)
-     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
-     */
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        fillBuffer();
-        if (eosReached || buffer.length == 0) {
-            return -1;
-        }
-
-        int max = Math.min(len, buffer.length - pos);
-        System.arraycopy(buffer, pos, b, off, max);
-
-        pos += max;
-        return max;
-    }
-
-    private void fillBuffer() throws IOException {
-        if (eosReached || (buffer != null && buffer.length > pos)) {
-            return;
-        }
-        try {
-            while (true) {
-                ActiveMQMessage m = receive();
-                if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
-                    // First message.
-                    long producerSequenceId = m.getMessageId().getProducerSequenceId();
-                    if (producerId == null) {
-                        // We have to start a stream at sequence id = 0
-                        if (producerSequenceId != 0) {
-                            continue;
-                        }
-                        nextSequenceId++;
-                        producerId = m.getMessageId().getProducerId();
-                    } else {
-                        // Verify it's the next message of the sequence.
-                        if (!m.getMessageId().getProducerId().equals(producerId)) {
-                            throw new IOException("Received an unexpected message: invalid producer: " + m);
-                        }
-                        if (producerSequenceId != nextSequenceId++) {
-                            throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
-                        }
-                    }
-
-                    // Read the buffer in.
-                    ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
-                    buffer = new byte[(int)bm.getBodyLength()];
-                    bm.readBytes(buffer);
-                    pos = 0;
-                    if (jmsProperties == null) {
-                        jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
-                    }
-                } else {
-                    eosReached = true;
-                    if (jmsProperties == null) {
-                        // no properties found
-                        jmsProperties = Collections.emptyMap();
-                    }
-                }
-                return;
-            }
-        } catch (JMSException e) {
-            eosReached = true;
-            if (jmsProperties == null) {
-                // no properties found
-                jmsProperties = Collections.emptyMap();
-            }
-            throw IOExceptionSupport.create(e);
-        }
-    }
-
-    @Override
-    public void dispatch(MessageDispatch md) {
-        unconsumedMessages.enqueue(md);
-    }
-
-    @Override
-    public String toString() {
-        return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
-    }
-
-    /**
-     * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
-     */
-    public class ReadTimeoutException extends IOException {
-        private static final long serialVersionUID = -3217758894326719909L;
-
-        public ReadTimeoutException() {
-            super();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
deleted file mode 100644
index c6e400d..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQOutputStream extends OutputStream implements Disposable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
-
-    protected int count;
-
-    final byte buffer[];
-
-    private final ActiveMQConnection connection;
-    private final Map<String, Object> properties;
-    private final ProducerInfo info;
-
-    private long messageSequence;
-    private boolean closed;
-    private final int deliveryMode;
-    private final int priority;
-    private final long timeToLive;
-    private boolean alwaysSyncSend = false;
-    private boolean addPropertiesOnFirstMsgOnly = false;
-
-    /**
-     * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
-     */
-    public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
-
-    public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
-                                long timeToLive) throws JMSException {
-        this.connection = connection;
-        this.deliveryMode = deliveryMode;
-        this.priority = priority;
-        this.timeToLive = timeToLive;
-        this.properties = properties == null ? null : new HashMap<String, Object>(properties);
-
-        Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
-        if (chunkSize == null) {
-            chunkSize = 64 * 1024;
-        } else {
-            if (chunkSize < 1) {
-                throw new IllegalArgumentException("Chunk size must be greater then 0");
-            } else {
-                chunkSize *= 1024;
-            }
-        }
-
-        buffer = new byte[chunkSize];
-
-        if (destination == null) {
-            throw new InvalidDestinationException("Don't understand null destinations");
-        }
-
-        this.info = new ProducerInfo(producerId);
-
-        // Allows the options on the destination to configure the stream
-        if (destination.getOptions() != null) {
-            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
-            IntrospectionSupport.setProperties(this, options, "producer.");
-            IntrospectionSupport.setProperties(this.info, options, "producer.");
-            if (options.size() > 0) {
-                String msg = "There are " + options.size()
-                    + " producer options that couldn't be set on the producer."
-                    + " Check the options are spelled correctly."
-                    + " Unknown parameters=[" + options + "]."
-                    + " This producer cannot be started.";
-                LOG.warn(msg);
-                throw new ConfigurationException(msg);
-            }
-        }
-
-        this.info.setDestination(destination);
-
-        this.connection.addOutputStream(this);
-        this.connection.asyncSendPacket(info);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!closed) {
-            flushBuffer();
-            try {
-                // Send an EOS style empty message to signal EOS.
-                send(new ActiveMQMessage(), true);
-                dispose();
-                this.connection.asyncSendPacket(info.createRemoveCommand());
-            } catch (JMSException e) {
-                IOExceptionSupport.create(e);
-            }
-        }
-    }
-
-    @Override
-    public void dispose() {
-        if (!closed) {
-            this.connection.removeOutputStream(this);
-            closed = true;
-        }
-    }
-
-    @Override
-    public synchronized void write(int b) throws IOException {
-        buffer[count++] = (byte) b;
-        if (count == buffer.length) {
-            flushBuffer();
-        }
-    }
-
-    @Override
-    public synchronized void write(byte b[], int off, int len) throws IOException {
-        while (len > 0) {
-            int max = Math.min(len, buffer.length - count);
-            System.arraycopy(b, off, buffer, count, max);
-
-            len -= max;
-            count += max;
-            off += max;
-
-            if (count == buffer.length) {
-                flushBuffer();
-            }
-        }
-    }
-
-    @Override
-    public synchronized void flush() throws IOException {
-        flushBuffer();
-    }
-
-    private void flushBuffer() throws IOException {
-        if (count != 0) {
-            try {
-                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
-                msg.writeBytes(buffer, 0, count);
-                send(msg, false);
-            } catch (JMSException e) {
-                throw IOExceptionSupport.create(e);
-            }
-            count = 0;
-        }
-    }
-
-    /**
-     * @param msg
-     * @throws JMSException
-     */
-    private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
-        if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
-            for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
-                String key = iter.next();
-                Object value = properties.get(key);
-                msg.setObjectProperty(key, value);
-            }
-        }
-        msg.setType("org.apache.activemq.Stream");
-        msg.setGroupID(info.getProducerId().toString());
-        if (eosMessage) {
-            msg.setGroupSequence(-1);
-        } else {
-            msg.setGroupSequence((int) messageSequence);
-        }
-        MessageId id = new MessageId(info.getProducerId(), messageSequence++);
-        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
-    }
-
-    @Override
-    public String toString() {
-        return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
-    }
-
-    public boolean isAlwaysSyncSend() {
-        return alwaysSyncSend;
-    }
-
-    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
-        this.alwaysSyncSend = alwaysSyncSend;
-    }
-
-    public boolean isAddPropertiesOnFirstMsgOnly() {
-        return addPropertiesOnFirstMsgOnly;
-    }
-
-    public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
-        this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java b/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
deleted file mode 100644
index 2f75b4f..0000000
--- a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
-/**
- * The StreamConnection interface allows you to send and receive data from a
- * Destination in using standard java InputStream and OutputStream objects. It's
- * best use case is to send and receive large amounts of data that would be to
- * large to hold in a single JMS message.
- *
- *
- */
-@Deprecated
-public interface StreamConnection extends Connection {
-
-    InputStream createInputStream(Destination dest) throws JMSException;
-
-    InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
-
-    InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
-
-    InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException;
-
-    InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
-
-    InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
-
-    InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
-
-    InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException;
-
-    OutputStream createOutputStream(Destination dest) throws JMSException;
-
-    OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
-
-    /**
-     * Unsubscribes a durable subscription that has been created by a client.
-     * <P>
-     * This method deletes the state being maintained on behalf of the
-     * subscriber by its provider.
-     * <P>
-     * It is erroneous for a client to delete a durable subscription while there
-     * is an active <CODE>MessageConsumer </CODE> or
-     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
-     * message is part of a pending transaction or has not been acknowledged in
-     * the session.
-     *
-     * @param name the name used to identify this subscription
-     * @throws JMSException if the session fails to unsubscribe to the durable
-     *                 subscription due to some internal error.
-     * @throws InvalidDestinationException if an invalid subscription name is
-     *                 specified.
-     * @since 1.1
-     */
-    void unsubscribe(String name) throws JMSException;
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
deleted file mode 100644
index 77f422e..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQInputStreamTest extends TestCase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
-
-    private static final String BROKER_URL = "tcp://localhost:0";
-    private static final String DESTINATION = "destination";
-    private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
-
-    private BrokerService broker;
-    private String connectionUri;
-
-    @Override
-    public void setUp() throws Exception {
-        broker = new BrokerService();
-        broker.setUseJmx(false);
-        broker.setPersistent(false);
-        broker.setDestinations(new ActiveMQDestination[] {
-            ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),
-        });
-        broker.addConnector(BROKER_URL);
-        broker.start();
-        broker.waitUntilStarted();
-
-        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        broker.stop();
-        broker.waitUntilStopped();
-    }
-
-    public void testInputStreamSetSyncSendOption() throws Exception {
-
-        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
-
-        OutputStream out = null;
-        try {
-            out = connection.createOutputStream(destination);
-
-            assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend());
-
-            LOG.debug("writing...");
-            for (int i = 0; i < STREAM_LENGTH; ++i) {
-                out.write(0);
-            }
-            LOG.debug("wrote " + STREAM_LENGTH + " bytes");
-        } finally {
-            if (out != null) {
-                out.close();
-            }
-        }
-
-        InputStream in = null;
-        try {
-            in = connection.createInputStream(destination);
-            LOG.debug("reading...");
-            int count = 0;
-            while (-1 != in.read()) {
-                ++count;
-            }
-            LOG.debug("read " + count + " bytes");
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
-
-        connection.close();
-    }
-
-    public void testInputStreamMatchesDefaultChuckSize() throws Exception {
-
-        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(DESTINATION);
-
-        OutputStream out = null;
-        try {
-            out = connection.createOutputStream(destination);
-            LOG.debug("writing...");
-            for (int i = 0; i < STREAM_LENGTH; ++i) {
-                out.write(0);
-            }
-            LOG.debug("wrote " + STREAM_LENGTH + " bytes");
-        } finally {
-            if (out != null) {
-                out.close();
-            }
-        }
-
-        InputStream in = null;
-        try {
-            in = connection.createInputStream(destination);
-            LOG.debug("reading...");
-            int count = 0;
-            while (-1 != in.read()) {
-                ++count;
-            }
-            LOG.debug("read " + count + " bytes");
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
-
-        connection.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
deleted file mode 100644
index d624d36..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author rnewson
- */
-public final class LargeStreamletTest extends TestCase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
-    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
-    private static final int BUFFER_SIZE = 1 * 1024;
-    private static final int MESSAGE_COUNT = 10 * 1024;
-
-    protected Exception writerException;
-    protected Exception readerException;
-
-    private final AtomicInteger totalRead = new AtomicInteger();
-    private final AtomicInteger totalWritten = new AtomicInteger();
-    private final AtomicBoolean stopThreads = new AtomicBoolean(false);
-
-    public void testStreamlets() throws Exception {
-        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
-
-        final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
-        connection.start();
-        try {
-            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            try {
-                final Destination destination = session.createQueue("wibble");
-                final Thread readerThread = new Thread(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        totalRead.set(0);
-                        try {
-                            final InputStream inputStream = connection.createInputStream(destination);
-                            try {
-                                int read;
-                                final byte[] buf = new byte[BUFFER_SIZE];
-                                while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) {
-                                    totalRead.addAndGet(read);
-                                }
-                            } finally {
-                                inputStream.close();
-                            }
-                        } catch (Exception e) {
-                            readerException = e;
-                            e.printStackTrace();
-                        } finally {
-                            LOG.info(totalRead + " total bytes read.");
-                        }
-                    }
-                });
-
-                final Thread writerThread = new Thread(new Runnable() {
-                    private final Random random = new Random();
-
-                    @Override
-                    public void run() {
-                        totalWritten.set(0);
-                        int count = MESSAGE_COUNT;
-                        try {
-                            final OutputStream outputStream = connection.createOutputStream(destination);
-                            try {
-                                final byte[] buf = new byte[BUFFER_SIZE];
-                                random.nextBytes(buf);
-                                while (count > 0 && !stopThreads.get()) {
-                                    outputStream.write(buf);
-                                    totalWritten.addAndGet(buf.length);
-                                    count--;
-                                }
-                            } finally {
-                                outputStream.close();
-                            }
-                        } catch (Exception e) {
-                            writerException = e;
-                            e.printStackTrace();
-                        } finally {
-                            LOG.info(totalWritten + " total bytes written.");
-                        }
-                    }
-                });
-
-                readerThread.start();
-                writerThread.start();
-
-                // Wait till reader is has finished receiving all the messages
-                // or he has stopped
-                // receiving messages.
-                Thread.sleep(1000);
-                int lastRead = totalRead.get();
-                while (readerThread.isAlive()) {
-                    readerThread.join(1000);
-                    // No progress?? then stop waiting..
-                    if (lastRead == totalRead.get()) {
-                        break;
-                    }
-                    lastRead = totalRead.get();
-                }
-
-                stopThreads.set(true);
-
-                assertTrue("Should not have received a reader exception", readerException == null);
-                assertTrue("Should not have received a writer exception", writerException == null);
-
-                assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get());
-
-            } finally {
-                session.close();
-            }
-        } finally {
-            connection.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8858dc29/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
deleted file mode 100755
index f392662..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.streams;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQInputStream;
-import org.apache.activemq.ActiveMQOutputStream;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- * JMSInputStreamTest
- */
-@Deprecated
-public class JMSInputStreamTest extends JmsTestSupport {
-
-    public Destination destination;
-    protected DataOutputStream out;
-    protected DataInputStream in;
-    private ActiveMQConnection connection2;
-
-    private ActiveMQInputStream amqIn;
-    private ActiveMQOutputStream amqOut;
-
-    public static Test suite() {
-        return suite(JMSInputStreamTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    public void initCombos() {
-        addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") });
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setAutoFail(true);
-        super.setUp();
-    }
-
-    private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
-        connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
-        connections.add(connection2);
-        if (props != null) {
-            amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
-        } else {
-            amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
-        }
-
-        out = new DataOutputStream(amqOut);
-        if (timeout == -1) {
-            amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
-        } else {
-            amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
-        }
-        in = new DataInputStream(amqIn);
-    }
-
-    /*
-     * @see TestCase#tearDown()
-     */
-    @Override
-    protected void tearDown() throws Exception {
-        super.tearDown();
-    }
-
-    /**
-     * Test for AMQ-3010
-     */
-    public void testInputStreamTimeout() throws Exception {
-        long timeout = 500;
-
-        setUpConnection(null, timeout);
-        try {
-            in.read();
-            fail();
-        } catch (ActiveMQInputStream.ReadTimeoutException e) {
-            // timeout reached, everything ok
-        }
-        in.close();
-    }
-
-    // Test for AMQ-2988
-    public void testStreamsWithProperties() throws Exception {
-        String name1 = "PROPERTY_1";
-        String name2 = "PROPERTY_2";
-        String value1 = "VALUE_1";
-        String value2 = "VALUE_2";
-        Map<String, Object> jmsProperties = new HashMap<String, Object>();
-        jmsProperties.put(name1, value1);
-        jmsProperties.put(name2, value2);
-        setUpConnection(jmsProperties, -1);
-
-        out.writeInt(4);
-        out.flush();
-        assertTrue(in.readInt() == 4);
-        out.writeFloat(2.3f);
-        out.flush();
-        assertTrue(in.readFloat() == 2.3f);
-        String str = "this is a test string";
-        out.writeUTF(str);
-        out.flush();
-        assertTrue(in.readUTF().equals(str));
-        for (int i = 0; i < 100; i++) {
-            out.writeLong(i);
-        }
-        out.flush();
-
-        // check properties before we try to read the stream
-        checkProperties(jmsProperties);
-
-        for (int i = 0; i < 100; i++) {
-            assertTrue(in.readLong() == i);
-        }
-
-        // check again after read was done
-        checkProperties(jmsProperties);
-    }
-
-    public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
-        String name1 = "PROPERTY_1";
-        String name2 = "PROPERTY_2";
-        String value1 = "VALUE_1";
-        String value2 = "VALUE_2";
-        Map<String, Object> jmsProperties = new HashMap<String, Object>();
-        jmsProperties.put(name1, value1);
-        jmsProperties.put(name2, value2);
-
-        ActiveMQDestination dest = (ActiveMQDestination) destination;
-
-        if (dest.isQueue()) {
-            destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
-        } else {
-            destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
-        }
-
-        setUpConnection(jmsProperties, -1);
-
-        assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
-
-        out.writeInt(4);
-        out.flush();
-        assertTrue(in.readInt() == 4);
-        out.writeFloat(2.3f);
-        out.flush();
-        assertTrue(in.readFloat() == 2.3f);
-        String str = "this is a test string";
-        out.writeUTF(str);
-        out.flush();
-        assertTrue(in.readUTF().equals(str));
-        for (int i = 0; i < 100; i++) {
-            out.writeLong(i);
-        }
-        out.flush();
-
-        // check properties before we try to read the stream
-        checkProperties(jmsProperties);
-
-        for (int i = 0; i < 100; i++) {
-            assertTrue(in.readLong() == i);
-        }
-
-        // check again after read was done
-        checkProperties(jmsProperties);
-    }
-
-    // check if the received stream has the properties set
-    // Test for AMQ-2988
-    private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
-        Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
-
-        // we should at least have the same amount or more properties
-        assertTrue(jmsProperties.size() <= receivedJmsProps.size());
-
-        // check the properties to see if we have everything in there
-        Iterator<String> propsIt = jmsProperties.keySet().iterator();
-        while (propsIt.hasNext()) {
-            String key = propsIt.next();
-            assertTrue(receivedJmsProps.containsKey(key));
-            assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
-        }
-    }
-
-    public void testLarge() throws Exception {
-        setUpConnection(null, -1);
-
-        final int testData = 23;
-        final int dataLength = 4096;
-        final int count = 1024;
-        byte[] data = new byte[dataLength];
-        for (int i = 0; i < data.length; i++) {
-            data[i] = testData;
-        }
-        final AtomicBoolean complete = new AtomicBoolean(false);
-        Thread runner = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    for (int x = 0; x < count; x++) {
-                        byte[] b = new byte[2048];
-                        in.readFully(b);
-                        for (int i = 0; i < b.length; i++) {
-                            assertTrue(b[i] == testData);
-                        }
-                    }
-                    complete.set(true);
-                    synchronized (complete) {
-                        complete.notify();
-                    }
-                } catch (Exception ex) {
-                    ex.printStackTrace();
-                }
-            }
-        });
-        runner.start();
-        for (int i = 0; i < count; i++) {
-            out.write(data);
-        }
-        out.flush();
-        synchronized (complete) {
-            if (!complete.get()) {
-                complete.wait(30000);
-            }
-        }
-        assertTrue(complete.get());
-    }
-
-    public void testStreams() throws Exception {
-        setUpConnection(null, -1);
-        out.writeInt(4);
-        out.flush();
-        assertTrue(in.readInt() == 4);
-        out.writeFloat(2.3f);
-        out.flush();
-        assertTrue(in.readFloat() == 2.3f);
-        String str = "this is a test string";
-        out.writeUTF(str);
-        out.flush();
-        assertTrue(in.readUTF().equals(str));
-        for (int i = 0; i < 100; i++) {
-            out.writeLong(i);
-        }
-        out.flush();
-
-        for (int i = 0; i < 100; i++) {
-            assertTrue(in.readLong() == i);
-        }
-    }
-}