You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/11 21:02:12 UTC

svn commit: r1145295 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQOutputStream.java test/java/org/apache/activemq/ActiveMQInputStreamTest.java

Author: tabish
Date: Mon Jul 11 19:02:12 2011
New Revision: 1145295

URL: http://svn.apache.org/viewvc?rev=1145295&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3227

ActiveMQInputStream will read the destination options and can be made to sync send via producer.alwaysSyncSend=true
Also remove the test cases dependency on port 61616.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=1145295&r1=1145294&r2=1145295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Mon Jul 11 19:02:12 2011
@@ -32,9 +32,10 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
- * 
+ *
  */
 public class ActiveMQOutputStream extends OutputStream implements Disposable {
 
@@ -51,6 +52,7 @@ public class ActiveMQOutputStream extend
     private final int deliveryMode;
     private final int priority;
     private final long timeToLive;
+    private boolean alwaysSyncSend = false;
 
     /**
      * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
@@ -83,6 +85,14 @@ public class ActiveMQOutputStream extend
         }
 
         this.info = new ProducerInfo(producerId);
+
+        // Allows the options on the destination to configure the stream
+        if (destination.getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
+            IntrospectionSupport.setProperties(this, options, "producer.");
+            IntrospectionSupport.setProperties(this.info, options, "producer.");
+        }
+
         this.info.setDestination(destination);
 
         this.connection.addOutputStream(this);
@@ -155,8 +165,8 @@ public class ActiveMQOutputStream extend
      */
     private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
         if (properties != null) {
-            for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
-                String key = (String) iter.next();
+            for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
+                String key = iter.next();
                 Object value = properties.get(key);
                 msg.setObjectProperty(key, value);
             }
@@ -169,11 +179,19 @@ public class ActiveMQOutputStream extend
             msg.setGroupSequence((int) messageSequence);
         }
         MessageId id = new MessageId(info.getProducerId(), messageSequence++);
-        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
+        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
     }
 
     public String toString() {
         return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
     }
 
+    public boolean isAlwaysSyncSend() {
+        return alwaysSyncSend;
+    }
+
+    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+        this.alwaysSyncSend = alwaysSyncSend;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java?rev=1145295&r1=1145294&r2=1145295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java Mon Jul 11 19:02:12 2011
@@ -32,12 +32,15 @@ public class ActiveMQInputStreamTest ext
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
 
-    private static final String BROKER_URL = "tcp://localhost:61616";
+    private static final String BROKER_URL = "tcp://localhost:0";
     private static final String DESTINATION = "destination";
     private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
 
-    public void testInputStreamMatchesDefaultChuckSize() throws Exception {
-        BrokerService broker = new BrokerService();
+    private BrokerService broker;
+    private String connectionUri;
+
+    public void setUp() throws Exception {
+        broker = new BrokerService();
         broker.setUseJmx(false);
         broker.setPersistent(false);
         broker.setDestinations(new ActiveMQDestination[] {
@@ -45,8 +48,61 @@ public class ActiveMQInputStreamTest ext
         });
         broker.addConnector(BROKER_URL);
         broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    public void testInputStreamSetSyncSendOption() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
+
+        OutputStream out = null;
+        try {
+            out = connection.createOutputStream(destination);
+
+            assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend());
+
+            LOG.debug("writing...");
+            for (int i = 0; i < STREAM_LENGTH; ++i) {
+                out.write(0);
+            }
+            LOG.debug("wrote " + STREAM_LENGTH + " bytes");
+        } finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+
+        InputStream in = null;
+        try {
+            in = connection.createInputStream(destination);
+            LOG.debug("reading...");
+            int count = 0;
+            while (-1 != in.read()) {
+                ++count;
+            }
+            LOG.debug("read " + count + " bytes");
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+        }
 
-        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+        connection.close();
+    }
+
+    public void testInputStreamMatchesDefaultChuckSize() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
         ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue destination = session.createQueue(DESTINATION);
@@ -81,6 +137,5 @@ public class ActiveMQInputStreamTest ext
         }
 
         connection.close();
-        broker.stop();
     }
 }