You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/09/08 13:23:36 UTC

svn commit: r994990 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/ActiveMQObjectMessage.java test/java/org/apache/activemq/usecases/MyObject.java test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java

Author: gtully
Date: Wed Sep  8 11:23:35 2010
New Revision: 994990

URL: http://svn.apache.org/viewvc?rev=994990&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2622 - additional test for the hybrid case of vm and non vm consuimers of deferred serilizable object messages

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MyObject.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=994990&r1=994989&r2=994990&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Wed Sep  8 11:23:35 2010
@@ -36,6 +36,7 @@ import org.apache.activemq.util.ByteArra
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * An <CODE>ObjectMessage</CODE> object is used to send a message that
@@ -193,6 +194,13 @@ public class ActiveMQObjectMessage exten
         return this.object;
     }
 
+    @Override
+    public void beforeMarshall(WireFormat wireFormat) throws IOException {
+        super.beforeMarshall(wireFormat);
+        // may have initiated on vm transport with deferred marshalling
+        storeContent();
+    }
+
     public void onMessageRolledBack() {
         super.onMessageRolledBack();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MyObject.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MyObject.java?rev=994990&r1=994989&r2=994990&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MyObject.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MyObject.java Wed Sep  8 11:23:35 2010
@@ -22,11 +22,14 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.IOException;
 import java.io.ObjectStreamException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class MyObject implements Serializable {
 
     private String message;
-    private boolean writeObjectCalled, readObjectCalled, readObjectNoDataCalled;
+    private AtomicInteger writeObjectCalled = new AtomicInteger(0);
+    private AtomicInteger readObjectCalled = new AtomicInteger(0);
+    private AtomicInteger readObjectNoDataCalled = new AtomicInteger(0);
 
     public MyObject(String message) {
         this.setMessage(message);
@@ -41,32 +44,29 @@ public class MyObject implements Seriali
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws IOException {
-        writeObjectCalled = true;
-        Thread.dumpStack();
+        writeObjectCalled.incrementAndGet();
         out.defaultWriteObject();
     }
 
     private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
-        readObjectCalled = true;
-        Thread.dumpStack();
         in.defaultReadObject();
+        readObjectCalled.incrementAndGet();        
     }
 
     private void readObjectNoData() throws ObjectStreamException {
-        Thread.dumpStack();
-        readObjectNoDataCalled = true;
+        readObjectNoDataCalled.incrementAndGet();
     }
 
-    public boolean getWriteObjectCalled() {
-        return writeObjectCalled;
+    public int getWriteObjectCalled() {
+        return writeObjectCalled.get();
     }
 
-    public boolean getReadObjectCalled() {
-        return readObjectCalled;
+    public int getReadObjectCalled() {
+        return readObjectCalled.get();
     }
 
-    public boolean getReadObjectNoDataCalled() {
-        return readObjectNoDataCalled;
+    public int getReadObjectNoDataCalled() {
+        return readObjectNoDataCalled.get();
     }
 }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java?rev=994990&r1=994989&r2=994990&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java Wed Sep  8 11:23:35 2010
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -30,6 +34,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -39,14 +44,8 @@ public class ObjectMessageNotSerializabl
     private static final Log LOG = LogFactory.getLog(ObjectMessageNotSerializableTest.class);
     
     BrokerService broker;
-    Connection connection;
-    ActiveMQSession session;
-    MessageProducer producer;
-    MessageConsumer consumer;
-    public ActiveMQDestination destination = new ActiveMQQueue("test");
-
-    int numReceived = 0;
-    boolean writeObjectCalled, readObjectCalled, readObjectNoDataCalled;
+    AtomicInteger numReceived = new AtomicInteger(0);
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
 
     public static Test suite() {
         return suite(ObjectMessageNotSerializableTest.class);
@@ -57,74 +56,210 @@ public class ObjectMessageNotSerializabl
     }
 	
 	protected void setUp() throws Exception {
+        exceptions.clear();
         broker = createBroker();
     }
 	
 	public void testSendNotSerializeableObjectMessage() throws Exception {
-		
-		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-        factory.setOptimizedMessageDispatch(true);
-        factory.setObjectMessageSerializationDefered(true);
-        factory.setCopyMessageOnSend(false);
 
+        final  ActiveMQDestination destination = new ActiveMQQueue("testQ");
+        final MyObject obj = new MyObject("A message");
 
-		connection = factory.createConnection();
-		session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-		producer = session.createProducer(destination);
+        final CountDownLatch consumerStarted = new CountDownLatch(1);
 
-		consumer = session.createConsumer(destination);
-		connection.start();
-		
+		Thread vmConsumerThread = new Thread("Consumer Thread") {
+			public void run() {
+				try {
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                    factory.setOptimizedMessageDispatch(true);
+                    factory.setObjectMessageSerializationDefered(true);
+                    factory.setCopyMessageOnSend(false);
+
+                    Connection connection = factory.createConnection();
+		            Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		            MessageConsumer consumer = session.createConsumer(destination);
+		            connection.start();
+                    consumerStarted.countDown();
+                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)consumer.receive(30000);
+                    if ( message != null ) {
+                        MyObject object = (MyObject)message.getObject();
+                        LOG.info("Got message " + object.getMessage());
+                        numReceived.incrementAndGet();
+                    }
+					consumer.close();
+				} catch (Throwable ex) {
+					exceptions.add(ex);
+				}
+			}
+		};
+        vmConsumerThread.start();
+
+		Thread producingThread = new Thread("Producing Thread") {
+            public void run() {
+                try {
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                    factory.setOptimizedMessageDispatch(true);
+                    factory.setObjectMessageSerializationDefered(true);
+                    factory.setCopyMessageOnSend(false);
+
+                    Connection connection = factory.createConnection();
+		            Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		            MessageProducer producer = session.createProducer(destination);
+                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)session.createObjectMessage();
+                    message.setObject(obj);
+                    producer.send(message);
+                	producer.close();
+                } catch (Throwable ex) {
+                    exceptions.add(ex);
+                }
+            }
+		};
+
+        assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS));
+		producingThread.start();
+
+        vmConsumerThread.join();
+        producingThread.join();
+
+        assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
+        assertEquals("readObject called", 0, obj.getReadObjectCalled());
+        assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
+
+        assertEquals("Got expected messages", 1, numReceived.get());
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+	}
+
+    public void testSendNotSerializeableObjectMessageOverTcp() throws Exception {
+        final  ActiveMQDestination destination = new ActiveMQTopic("testTopic");
         final MyObject obj = new MyObject("A message");
 
-		Thread consumerThread = new Thread("Consumer Thread") {
+        final CountDownLatch consumerStarted = new CountDownLatch(3);
+        final Vector<Throwable> exceptions = new Vector<Throwable>();
+		Thread vmConsumerThread = new Thread("Consumer Thread") {
+			public void run() {
+				try {
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                    factory.setOptimizedMessageDispatch(true);
+                    factory.setObjectMessageSerializationDefered(true);
+                    factory.setCopyMessageOnSend(false);
+
+                    Connection connection = factory.createConnection();
+		            Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		            MessageConsumer consumer = session.createConsumer(destination);
+		            connection.start();
+                    consumerStarted.countDown();
+                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)consumer.receive(30000);
+                    if ( message != null ) {                  
+                        MyObject object = (MyObject)message.getObject();
+                        LOG.info("Got message " + object.getMessage());
+                        numReceived.incrementAndGet();
+                    }
+					consumer.close();
+				} catch (Throwable ex) {
+					exceptions.add(ex);
+				}
+			}
+		};
+        vmConsumerThread.start();
+
+        Thread tcpConsumerThread = new Thread("Consumer Thread") {
 			public void run() {
 				try {
-                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)consumer.receive();
+
+                    ActiveMQConnectionFactory factory =
+                            new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+                    factory.setOptimizedMessageDispatch(true);
+
+                    Connection connection = factory.createConnection();
+		            Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		            MessageConsumer consumer = session.createConsumer(destination);
+		            connection.start();
+                    consumerStarted.countDown();
+                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)consumer.receive(30000);
                     if ( message != null ) {
-                        numReceived++;
                         MyObject object = (MyObject)message.getObject();
                         LOG.info("Got message " + object.getMessage());
+                        numReceived.incrementAndGet();
+                        assertEquals("readObject called", 1, object.getReadObjectCalled());
                     }
 					consumer.close();
 				} catch (Throwable ex) {
-					ex.printStackTrace();
+					exceptions.add(ex);
 				}
 			}
 		};
-		
-        consumerThread.start();
-		
+        tcpConsumerThread.start();
+
+
+        Thread notherVmConsumerThread = new Thread("Consumer Thread") {
+            public void run() {
+                try {
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                    factory.setOptimizedMessageDispatch(true);
+                    factory.setObjectMessageSerializationDefered(true);
+                    factory.setCopyMessageOnSend(false);
+
+                    Connection connection = factory.createConnection();
+                    Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(destination);
+                    connection.start();
+                    consumerStarted.countDown();
+                    ActiveMQObjectMessage message = (ActiveMQObjectMessage)consumer.receive(30000);
+                    if ( message != null ) {
+                        MyObject object = (MyObject)message.getObject();
+                        LOG.info("Got message " + object.getMessage());
+                        numReceived.incrementAndGet();
+                    }
+                    consumer.close();
+                } catch (Throwable ex) {
+                    exceptions.add(ex);
+                }
+            }
+        };
+        notherVmConsumerThread.start();
+
 		Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                    factory.setOptimizedMessageDispatch(true);
+                    factory.setObjectMessageSerializationDefered(true);
+                    factory.setCopyMessageOnSend(false);
+
+                    Connection connection = factory.createConnection();
+		            Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		            MessageProducer producer = session.createProducer(destination);
                     ActiveMQObjectMessage message = (ActiveMQObjectMessage)session.createObjectMessage();
                     message.setObject(obj);
                     producer.send(message);
                 	producer.close();
                 } catch (Throwable ex) {
-                    ex.printStackTrace();
+                    exceptions.add(ex);
                 }
             }
 		};
-		
+
+        assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS));
 		producingThread.start();
 		
-        consumerThread.join();
+        vmConsumerThread.join();
+        tcpConsumerThread.join();
+        notherVmConsumerThread.join();
         producingThread.join();
-        session.close();
 
-        assertFalse("writeObject called", obj.getWriteObjectCalled());
-        assertFalse("readObject called", obj.getReadObjectCalled());
-        assertFalse("readObjectNoData called", obj.getReadObjectNoDataCalled());
+        assertEquals("writeObject called", 1, obj.getWriteObjectCalled());
+        assertEquals("readObject called", 0, obj.getReadObjectCalled());
+        assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
 
+        assertEquals("Got expected messages", 3, numReceived.get());
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
 	}
 
 	private BrokerService createBroker() throws Exception {
 	    BrokerService broker = new BrokerService();
         broker.setPersistent(false);
         broker.setUseJmx(false);
-        broker.addConnector("vm://localhost");
+        broker.addConnector("tcp://localhost:0");
         
         broker.start();
         broker.waitUntilStarted();
@@ -132,7 +267,6 @@ public class ObjectMessageNotSerializabl
 	}
 
 	protected void tearDown() throws Exception {
-		connection.stop();
 		broker.stop();
 		broker.waitUntilStopped();
 	}