You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/11 21:02:12 UTC
svn commit: r1145295 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQOutputStream.java
test/java/org/apache/activemq/ActiveMQInputStreamTest.java
Author: tabish
Date: Mon Jul 11 19:02:12 2011
New Revision: 1145295
URL: http://svn.apache.org/viewvc?rev=1145295&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3227
ActiveMQInputStream will read the destination options and can be made to sync send via producer.alwaysSyncSend=true
Also remove the test cases dependency on port 61616.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=1145295&r1=1145294&r2=1145295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Mon Jul 11 19:02:12 2011
@@ -32,9 +32,10 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
/**
- *
+ *
*/
public class ActiveMQOutputStream extends OutputStream implements Disposable {
@@ -51,6 +52,7 @@ public class ActiveMQOutputStream extend
private final int deliveryMode;
private final int priority;
private final long timeToLive;
+ private boolean alwaysSyncSend = false;
/**
* JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
@@ -83,6 +85,14 @@ public class ActiveMQOutputStream extend
}
this.info = new ProducerInfo(producerId);
+
+ // Allows the options on the destination to configure the stream
+ if (destination.getOptions() != null) {
+ Map<String, String> options = new HashMap<String, String>(destination.getOptions());
+ IntrospectionSupport.setProperties(this, options, "producer.");
+ IntrospectionSupport.setProperties(this.info, options, "producer.");
+ }
+
this.info.setDestination(destination);
this.connection.addOutputStream(this);
@@ -155,8 +165,8 @@ public class ActiveMQOutputStream extend
*/
private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
if (properties != null) {
- for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
- String key = (String) iter.next();
+ for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
+ String key = iter.next();
Object value = properties.get(key);
msg.setObjectProperty(key, value);
}
@@ -169,11 +179,19 @@ public class ActiveMQOutputStream extend
msg.setGroupSequence((int) messageSequence);
}
MessageId id = new MessageId(info.getProducerId(), messageSequence++);
- connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
+ connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
}
public String toString() {
return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
}
+ public boolean isAlwaysSyncSend() {
+ return alwaysSyncSend;
+ }
+
+ public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+ this.alwaysSyncSend = alwaysSyncSend;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java?rev=1145295&r1=1145294&r2=1145295&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java Mon Jul 11 19:02:12 2011
@@ -32,12 +32,15 @@ public class ActiveMQInputStreamTest ext
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
- private static final String BROKER_URL = "tcp://localhost:61616";
+ private static final String BROKER_URL = "tcp://localhost:0";
private static final String DESTINATION = "destination";
private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
- public void testInputStreamMatchesDefaultChuckSize() throws Exception {
- BrokerService broker = new BrokerService();
+ private BrokerService broker;
+ private String connectionUri;
+
+ public void setUp() throws Exception {
+ broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistent(false);
broker.setDestinations(new ActiveMQDestination[] {
@@ -45,8 +48,61 @@ public class ActiveMQInputStreamTest ext
});
broker.addConnector(BROKER_URL);
broker.start();
+ broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+ }
+
+ public void tearDown() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ public void testInputStreamSetSyncSendOption() throws Exception {
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+ ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
+
+ OutputStream out = null;
+ try {
+ out = connection.createOutputStream(destination);
+
+ assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend());
+
+ LOG.debug("writing...");
+ for (int i = 0; i < STREAM_LENGTH; ++i) {
+ out.write(0);
+ }
+ LOG.debug("wrote " + STREAM_LENGTH + " bytes");
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ InputStream in = null;
+ try {
+ in = connection.createInputStream(destination);
+ LOG.debug("reading...");
+ int count = 0;
+ while (-1 != in.read()) {
+ ++count;
+ }
+ LOG.debug("read " + count + " bytes");
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+ connection.close();
+ }
+
+ public void testInputStreamMatchesDefaultChuckSize() throws Exception {
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(DESTINATION);
@@ -81,6 +137,5 @@ public class ActiveMQInputStreamTest ext
}
connection.close();
- broker.stop();
}
}