You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 21:44:23 UTC

svn commit: r786636 - in /activemq/sandbox/activemq-flow: activemq-client/src/test/java/org/apache/activemq/apollo/test1/ activemq-client/src/test/java/org/apache/activemq/legacy/test1/ activemq-client/src/test/java/org/apache/activemq/legacy/test3/ ac...

Author: chirino
Date: Fri Jun 19 19:44:23 2009
New Revision: 786636

URL: http://svn.apache.org/viewvc?rev=786636&view=rev
Log:
Making the JMSMessageTest pass.
 - Had to fix a few issues with pipe transport marshalling options.


Added:
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java
      - copied, changed from r786631, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
Removed:
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
Modified:
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Copied: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java (from r786631, activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java?p2=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java&p1=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java&r1=786631&r2=786636&rev=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test1/JMSMessageTest.java Fri Jun 19 19:44:23 2009
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.legacy.test1;
+package org.apache.activemq.apollo.test1;
 
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -39,7 +40,11 @@
 import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.test1.JmsTestSupport;
+import org.apache.activemq.transport.TransportFactory;
 
 /**
  * Test cases used to test the JMS message consumer.
@@ -56,12 +61,19 @@
     public boolean durableConsumer;
     public String connectURL;
 
+    @Override
+    protected Broker createBroker() throws Exception {
+        Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+        broker.addTransportServer(TransportFactory.bind(new URI(connectURL)));
+        return broker;
+    }
+    
     /**
      * Run all these tests in both marshaling and non-marshaling mode.
      */
     public void initCombos() {
-        addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
-                                                         "vm://localhost?marshal=true"});
+        addCombinationValues("connectURL", new Object[] {"pipe://localhost?marshal=false",
+                                                         "pipe://localhost?marshal=true"});
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java Fri Jun 19 19:44:23 2009
@@ -39,6 +39,7 @@
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.TransportFactory;
 
 /**
  * Test cases used to test the JMS message consumer.
@@ -105,11 +106,13 @@
     }
 
     protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://localhost");
+        return new ActiveMQConnectionFactory("pipe://localhost");
     }
 
     protected Broker createBroker() throws Exception {
-        return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+        Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+        broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+        return broker;
     }
 
     protected void setUp() throws Exception {

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsResourceProvider.java Fri Jun 19 19:44:23 2009
@@ -35,7 +35,7 @@
  */
 public class JmsResourceProvider {
 
-    private String serverUri = "vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml";
+    private String serverUri = "pipe://localhost";
     private boolean transacted;
     private int ackMode = Session.AUTO_ACKNOWLEDGE;
     private boolean isTopic;

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsTransactionTestSupport.java Fri Jun 19 19:44:23 2009
@@ -35,8 +35,9 @@
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,7 +59,7 @@
     protected Destination destination;
     protected int batchCount = 10;
     protected int batchSize = 20;
-    protected BrokerService broker;
+    protected Broker broker;
 
     // for message listener test
     private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
@@ -112,8 +113,10 @@
 
     /**
      */
-    protected BrokerService createBroker() throws Exception, URISyntaxException {
-        return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+    protected Broker createBroker() throws Exception, URISyntaxException {
+        Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+        broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+        return broker;
     }
 
     /*

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/JmsMultipleClientsTestSupport.java Fri Jun 19 19:44:23 2009
@@ -39,11 +39,12 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -66,12 +67,12 @@
     protected boolean topic;
     protected boolean persistent;
 
-    protected BrokerService broker;
     protected Destination destination;
     protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
     protected MessageIdList allMessagesList = new MessageIdList();
 
     private AtomicInteger producerLock;
+	private Broker broker;
 
     protected void startProducers(Destination dest, int msgCount) throws Exception {
         startProducers(createConnectionFactory(), dest, msgCount);
@@ -207,11 +208,13 @@
     }
 
     protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://localhost");
+        return new ActiveMQConnectionFactory("pipe://localhost");
     }
 
-    protected BrokerService createBroker() throws Exception {
-        return BrokerFactory.createBroker(new URI("vm://localhost?broker=jaxb:classpath:non-persistent-activemq.xml"));
+    protected Broker createBroker() throws Exception {
+        Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+        broker.addTransportServer(TransportFactory.bind(new URI("pipe://localhost")));
+        return broker;
     }
 
     protected void setUp() throws Exception {

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test6/NioQueueSubscriptionTest.java Fri Jun 19 19:44:23 2009
@@ -17,10 +17,8 @@
 package org.apache.activemq.legacy.test6;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,10 +30,9 @@
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.legacy.broker.BrokerFactory;
-import org.apache.activemq.legacy.broker.BrokerService;
-import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
-import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,20 +55,23 @@
     }
     
     @Override
-    protected BrokerService createBroker() throws Exception {
-        BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
-        answer.setUseJmx(false);
-        answer.setDeleteAllMessagesOnStartup(true);
-        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
-        final PolicyEntry entry = new PolicyEntry();
-        entry.setQueue(">");
-        entry.setOptimizedDispatch(true);
-        policyEntries.add(entry);
-
-        final PolicyMap policyMap = new PolicyMap();
-        policyMap.setPolicyEntries(policyEntries);
-        answer.setDestinationPolicy(policyMap);
-        return answer;
+    protected Broker createBroker() throws Exception {
+        Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
+        broker.addTransportServer(TransportFactory.bind(new URI("nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0")));
+        
+// TODO:    	
+//        answer.setUseJmx(false);
+//        answer.setDeleteAllMessagesOnStartup(true);
+//        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+//        final PolicyEntry entry = new PolicyEntry();
+//        entry.setQueue(">");
+//        entry.setOptimizedDispatch(true);
+//        policyEntries.add(entry);
+//
+//        final PolicyMap policyMap = new PolicyMap();
+//        policyMap.setPolicyEntries(policyEntries);
+//        answer.setDestinationPolicy(policyMap);
+        return broker;
     }
     
     public void testLotsOfConcurrentConnections() throws Exception {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 19:44:23 2009
@@ -344,21 +344,24 @@
     }
 
     public void onCommand(Object o) {
-
-        Command command = (Command) o;
-        boolean responseRequired = command.isResponseRequired();
+    	boolean responseRequired=false;
+    	int commandId=0;
         try {
+            Command command = (Command) o;
+            commandId = command.getCommandId();
+            responseRequired = command.isResponseRequired();
             //System.out.println(o);
             command.visit(visitor);
         } catch (Exception e) {
             if (responseRequired) {
                 ExceptionResponse response = new ExceptionResponse(e);
-                response.setCorrelationId(command.getCommandId());
+                response.setCorrelationId(commandId);
                 connection.write(response);
             } else {
                 connection.onException(e);
             }
-
+        } catch (Throwable e) {
+            connection.onException(new Exception(e));
         }
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=786636&r1=786635&r2=786636&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Jun 19 19:44:23 2009
@@ -47,7 +47,7 @@
         private DispatchContext readContext;
         private String name;
         private WireFormat wireFormat;
-        private boolean marshall;
+        private boolean marshal;
         private boolean trace;
 
         public PipeTransport(Pipe<Object> pipe) {
@@ -93,7 +93,7 @@
         public void oneway(Object command) throws IOException {
 
             try {
-                if (wireFormat != null && marshall) {
+                if (wireFormat != null && marshal) {
                     pipe.write(wireFormat.marshal(command));
                 } else {
                     pipe.write(command);
@@ -119,7 +119,7 @@
                     	if(o == EOF_TOKEN) {
                     		throw new EOFException();
                     	}                    	
-                        if (wireFormat != null && marshall) {
+                        if (wireFormat != null && marshal) {
                             listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
                         } else {
                             listener.onCommand(o);
@@ -141,7 +141,11 @@
                     	if(value == EOF_TOKEN) {
                     		throw new EOFException();
                     	} else {
-                    		listener.onCommand(value);
+                            if (wireFormat != null && marshal) {
+                                listener.onCommand(wireFormat.unmarshal((ByteSequence)value));
+                            } else {
+                                listener.onCommand(value);
+                            }
                     	}
                     }
                 }
@@ -228,6 +232,14 @@
 		public void setTrace(boolean trace) {
 			this.trace = trace;
 		}
+
+		public boolean isMarshal() {
+			return marshal;
+		}
+
+		public void setMarshal(boolean marshall) {
+			this.marshal = marshall;
+		}
     }
 
     protected class PipeTransportServer implements TransportServer {
@@ -235,6 +247,7 @@
         protected TransportAcceptListener listener;
         protected String name;
         protected WireFormatFactory wireFormatFactory;
+        protected boolean marshal;
         protected final AtomicInteger connectionCounter = new AtomicInteger();
 
         public URI getConnectURI() {
@@ -276,6 +289,7 @@
             PipeTransport rc = createClientTransport(pipe);
             rc.setRemoteAddress(remoteAddress);
             PipeTransport serverSide = cerateServerTransport(pipe);
+            serverSide.setMarshal(marshal);
             serverSide.setRemoteAddress(remoteAddress);
             if (wireFormatFactory != null) {
                 rc.setWireFormat(wireFormatFactory.createWireFormat());
@@ -296,6 +310,14 @@
         public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
             this.wireFormatFactory = wireFormatFactory;
         }
+
+		public boolean isMarshal() {
+			return marshal;
+		}
+
+		public void setMarshal(boolean marshal) {
+			this.marshal = marshal;
+		}
     }
 
     @Override
@@ -311,6 +333,13 @@
 	            server.setConnectURI(uri);
 	            server.setName(node);
                 server.setWireFormatFactory(createWireFormatFactory(options));
+                IntrospectionSupport.setProperties(server, options);
+                
+                if (!options.isEmpty()) {
+                    throw new IllegalArgumentException("Invalid bind parameters: " + options);
+                }
+                
+                
 	            servers.put(node, server);
 	            return server;
     		}