You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 21:44:23 UTC
svn commit: r786636 - in /activemq/sandbox/activemq-flow:
activemq-client/src/test/java/org/apache/activemq/apollo/test1/
activemq-client/src/test/java/org/apache/activemq/legacy/test1/
activemq-client/src/test/java/org/apache/activemq/legacy/test3/ ac...
Author: chirino
Date: Fri Jun 19 19:44:23 2009
New Revision: 786636
URL: http://svn.apache.org/viewvc?rev=786636&view=rev
Log:
Making the JMSMessageTest pass.
- Had to fix a few issues with pipe transport marshalling options.
Added:
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java
- copied, changed from r786631, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
Removed:
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
Modified:
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Copied: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java (from r786631, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java?p2=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java&p1=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java&r1=786631&r2=786636&rev=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java Fri Jun 19 19:44:23 2009
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.legacy.test1;
+package org.apache.activemq.apollo.test1;
+import java.net.URI;
import java.net.URISyntaxException;
import java.util.Enumeration;
import java.util.HashMap;
@@ -39,7 +40,11 @@
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.test1.JmsTestSupport;
+import org.apache.activemq.transport.TransportFactory;
/**
* Test cases used to test the JMS message consumer.
@@ -56,12 +61,19 @@
public boolean durableConsumer;
public String connectURL;
+ @Override
+ protected Broker createBroker() throws Exception {
+ Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+ broker.addTransportServer(TransportFactory.bind(new URI(connectURL)));
+ return broker;
+ }
+
/**
* Run all these tests in both marshaling and non-marshaling mode.
*/
public void initCombos() {
- addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
- "vm://localhost?marshal=true"});
+ addCombinationValues("connectURL", new Object[] {"pipe://localhost?marshal=false",
+ "pipe://localhost?marshal=true"});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java Fri Jun 19 19:44:23 2009
@@ -39,6 +39,7 @@
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerFactory;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.TransportFactory;
/**
* Test cases used to test the JMS message consumer.
@@ -105,11 +106,13 @@
}
protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://localhost");
+ return new ActiveMQConnectionFactory("pipe://localhost");
}
protected Broker createBroker() throws Exception {
- return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+ Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+ broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+ return broker;
}
protected void setUp() throws Exception {
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java Fri Jun 19 19:44:23 2009
@@ -35,7 +35,7 @@
*/
public class JmsResourceProvider {
- private String serverUri = "vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml";
+ private String serverUri = "pipe://localhost";
private boolean transacted;
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private boolean isTopic;
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java Fri Jun 19 19:44:23 2009
@@ -35,8 +35,9 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +59,7 @@
protected Destination destination;
protected int batchCount = 10;
protected int batchSize = 20;
- protected BrokerService broker;
+ protected Broker broker;
// for message listener test
private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
@@ -112,8 +113,10 @@
/**
*/
- protected BrokerService createBroker() throws Exception, URISyntaxException {
- return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+ protected Broker createBroker() throws Exception, URISyntaxException {
+ Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+ broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+ return broker;
}
/*
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java Fri Jun 19 19:44:23 2009
@@ -39,11 +39,12 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.MessageIdList;
/**
@@ -66,12 +67,12 @@
protected boolean topic;
protected boolean persistent;
- protected BrokerService broker;
protected Destination destination;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
protected MessageIdList allMessagesList = new MessageIdList();
private AtomicInteger producerLock;
+ private Broker broker;
protected void startProducers(Destination dest, int msgCount) throws Exception {
startProducers(createConnectionFactory(), dest, msgCount);
@@ -207,11 +208,13 @@
}
protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://localhost");
+ return new ActiveMQConnectionFactory("pipe://localhost");
}
- protected BrokerService createBroker() throws Exception {
- return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+ protected Broker createBroker() throws Exception {
+ Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+ broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+ return broker;
}
protected void setUp() throws Exception {
Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java Fri Jun 19 19:44:23 2009
@@ -17,10 +17,8 @@
package org.apache.activemq.legacy.test6;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,10 +30,9 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
-import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
-import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,20 +55,23 @@
}
@Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
- answer.setUseJmx(false);
- answer.setDeleteAllMessagesOnStartup(true);
- final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
- final PolicyEntry entry = new PolicyEntry();
- entry.setQueue(">");
- entry.setOptimizedDispatch(true);
- policyEntries.add(entry);
-
- final PolicyMap policyMap = new PolicyMap();
- policyMap.setPolicyEntries(policyEntries);
- answer.setDestinationPolicy(policyMap);
- return answer;
+ protected Broker createBroker() throws Exception {
+ Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+ broker.addTransportServer(TransportFactory.bind(new URI("nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0")));
+
+// TODO:
+// answer.setUseJmx(false);
+// answer.setDeleteAllMessagesOnStartup(true);
+// final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+// final PolicyEntry entry = new PolicyEntry();
+// entry.setQueue(">");
+// entry.setOptimizedDispatch(true);
+// policyEntries.add(entry);
+//
+// final PolicyMap policyMap = new PolicyMap();
+// policyMap.setPolicyEntries(policyEntries);
+// answer.setDestinationPolicy(policyMap);
+ return broker;
}
public void testLotsOfConcurrentConnections() throws Exception {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 19:44:23 2009
@@ -344,21 +344,24 @@
}
public void onCommand(Object o) {
-
- Command command = (Command) o;
- boolean responseRequired = command.isResponseRequired();
+ boolean responseRequired=false;
+ int commandId=0;
try {
+ Command command = (Command) o;
+ commandId = command.getCommandId();
+ responseRequired = command.isResponseRequired();
//System.out.println(o);
command.visit(visitor);
} catch (Exception e) {
if (responseRequired) {
ExceptionResponse response = new ExceptionResponse(e);
- response.setCorrelationId(command.getCommandId());
+ response.setCorrelationId(commandId);
connection.write(response);
} else {
connection.onException(e);
}
-
+ } catch (Throwable e) {
+ connection.onException(new Exception(e));
}
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Jun 19 19:44:23 2009
@@ -47,7 +47,7 @@
private DispatchContext readContext;
private String name;
private WireFormat wireFormat;
- private boolean marshall;
+ private boolean marshal;
private boolean trace;
public PipeTransport(Pipe<Object> pipe) {
@@ -93,7 +93,7 @@
public void oneway(Object command) throws IOException {
try {
- if (wireFormat != null && marshall) {
+ if (wireFormat != null && marshal) {
pipe.write(wireFormat.marshal(command));
} else {
pipe.write(command);
@@ -119,7 +119,7 @@
if(o == EOF_TOKEN) {
throw new EOFException();
}
- if (wireFormat != null && marshall) {
+ if (wireFormat != null && marshal) {
listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
} else {
listener.onCommand(o);
@@ -141,7 +141,11 @@
if(value == EOF_TOKEN) {
throw new EOFException();
} else {
- listener.onCommand(value);
+ if (wireFormat != null && marshal) {
+ listener.onCommand(wireFormat.unmarshal((ByteSequence)value));
+ } else {
+ listener.onCommand(value);
+ }
}
}
}
@@ -228,6 +232,14 @@
public void setTrace(boolean trace) {
this.trace = trace;
}
+
+ public boolean isMarshal() {
+ return marshal;
+ }
+
+ public void setMarshal(boolean marshall) {
+ this.marshal = marshall;
+ }
}
protected class PipeTransportServer implements TransportServer {
@@ -235,6 +247,7 @@
protected TransportAcceptListener listener;
protected String name;
protected WireFormatFactory wireFormatFactory;
+ protected boolean marshal;
protected final AtomicInteger connectionCounter = new AtomicInteger();
public URI getConnectURI() {
@@ -276,6 +289,7 @@
PipeTransport rc = createClientTransport(pipe);
rc.setRemoteAddress(remoteAddress);
PipeTransport serverSide = cerateServerTransport(pipe);
+ serverSide.setMarshal(marshal);
serverSide.setRemoteAddress(remoteAddress);
if (wireFormatFactory != null) {
rc.setWireFormat(wireFormatFactory.createWireFormat());
@@ -296,6 +310,14 @@
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
}
+
+ public boolean isMarshal() {
+ return marshal;
+ }
+
+ public void setMarshal(boolean marshal) {
+ this.marshal = marshal;
+ }
}
@Override
@@ -311,6 +333,13 @@
server.setConnectURI(uri);
server.setName(node);
server.setWireFormatFactory(createWireFormatFactory(options));
+ IntrospectionSupport.setProperties(server, options);
+
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid bind parameters: " + options);
+ }
+
+
servers.put(node, server);
return server;
}