You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/22 09:09:55 UTC
svn commit: r650392 - in /activemq/trunk/activemq-ra: ./
src/main/java/org/apache/activemq/ra/ src/test/java/org/apache/activemq/ra/
Author: rajdavies
Date: Tue Apr 22 00:09:52 2008
New Revision: 650392
URL: http://svn.apache.org/viewvc?rev=650392&view=rev
Log:
Applied patch from https://issues.apache.org/activemq/browse/AMQ-1200
Modified:
activemq/trunk/activemq-ra/pom.xml
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQAsfEndpointWorkerTest.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
Modified: activemq/trunk/activemq-ra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/pom.xml?rev=650392&r1=650391&r2=650392&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/pom.xml (original)
+++ activemq/trunk/activemq-ra/pom.xml Tue Apr 22 00:09:52 2008
@@ -54,11 +54,13 @@
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock</artifactId>
+ <version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jmock</groupId>
<artifactId>jmock-cglib</artifactId>
+ <version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java?rev=650392&r1=650391&r2=650392&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointActivationKey.java Tue Apr 22 00:09:52 2008
@@ -26,7 +26,7 @@
/**
* For testing
*/
- ActiveMQEndpointActivationKey() {
+ protected ActiveMQEndpointActivationKey() {
this(null, null);
}
Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=650392&r1=650391&r2=650392&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java Tue Apr 22 00:09:52 2008
@@ -159,9 +159,14 @@
while (true) {
log.debug("run loop start");
try {
+ if ( session.isRunning() ) {
InboundContextSupport.register(this);
currentBatchSize = 0;
session.run();
+ } else {
+ log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
+ stale = true;
+ }
} catch (Throwable e) {
stale = true;
log.debug("Endpoint failed to process message.", e);
Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQAsfEndpointWorkerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQAsfEndpointWorkerTest.java?rev=650392&r1=650391&r2=650392&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQAsfEndpointWorkerTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQAsfEndpointWorkerTest.java Tue Apr 22 00:09:52 2008
@@ -20,7 +20,7 @@
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
-import org.jmock.cglib.Mock;
+import org.jmock.Mock;
import org.jmock.cglib.MockObjectTestCase;
/**
@@ -40,14 +40,15 @@
}
public void testTopicSubscriberDurableNoDups() throws Exception {
-
- /*
- * Constraint[] args = {isA(Topic.class),
- * eq(stubActivationSpec.getSubscriptionId()), NULL, ANYTHING,
- * ANYTHING}; mockConnection.expects(once())
- * .method("createDurableConnectionConsumer") .with(args)
- * .will(returnValue(null)); worker.start(); verifyMocks();
- */
+// Constraint[] args = {isA(Topic.class),
+// eq(stubActivationSpec.getSubscriptionId()),
+// NULL,
+// ANYTHING,
+// ANYTHING};
+// mockConnection.expects(once()).method("createDurableConnectionConsumer").with(args)
+// .will(returnValue(null));
+// worker.start();
+// verifyMocks();
}
protected void setUp() throws Exception {
@@ -66,11 +67,11 @@
}
private void setupMocks() {
- mockResourceAdapter = new Mock(ActiveMQResourceAdapter.class);
- mockActivationKey = new Mock(ActiveMQEndpointActivationKey.class);
- mockEndpointFactory = new Mock(MessageEndpointFactory.class);
- mockBootstrapContext = new Mock(BootstrapContext.class);
-// mockConnection = new Mock(Connection.class);
+ mockResourceAdapter = mock(ActiveMQResourceAdapter.class);
+ mockActivationKey = mock(ActiveMQEndpointActivationKey.class);
+ mockEndpointFactory = mock(MessageEndpointFactory.class);
+ mockBootstrapContext = mock(BootstrapContext.class);
+// mockConnection = mock(Connection.class);
mockActivationKey.expects(atLeastOnce()).method("getMessageEndpointFactory").will(returnValue((MessageEndpointFactory)mockEndpointFactory.proxy()));
Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java?rev=650392&r1=650391&r2=650392&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java Tue Apr 22 00:09:52 2008
@@ -16,12 +16,74 @@
*/
package org.apache.activemq.ra;
-import org.jmock.MockObjectTestCase;
+import java.lang.reflect.Method;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.work.WorkManager;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.jmock.Mock;
+import org.jmock.cglib.MockObjectTestCase;
/**
* @version $Revision: 1.1.1.1 $
*/
public class ServerSessionImplTest extends MockObjectTestCase {
+ private static final String BROKER_URL = "vm://localhost";
+ private ServerSessionImpl serverSession;
+ private Mock pool;
+ private Mock workManager;
+ private MessageEndpoint messageEndpoint;
+ private ActiveMQConnection con;
+ private ActiveMQSession session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ org.apache.activemq.ActiveMQConnectionFactory factory =
+ new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
+ con = (ActiveMQConnection) factory.createConnection();
+ session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ pool = mock(ServerSessionPoolImpl.class, new Class[]{ActiveMQEndpointWorker.class, int.class}, new Object[]{null, 10});
+ workManager = mock(WorkManager.class);
+ messageEndpoint = new MockMessageEndpoint();
+
+ serverSession = new ServerSessionImpl(
+ (ServerSessionPoolImpl) pool.proxy(),
+ session,
+ (WorkManager) workManager.proxy(),
+ messageEndpoint,
+ false,
+ 10);
+ }
+
+ private class MockMessageEndpoint implements MessageEndpoint, MessageListener {
+
+ public void afterDelivery() throws ResourceException
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void beforeDelivery(Method arg0) throws NoSuchMethodException, ResourceException
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void release()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void onMessage(Message msg)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ }
/**
* Need to re-work this test case, it broke since the amq4 internals changed and
@@ -90,4 +152,11 @@
return (TransportChannel) tc.proxy();
}
*/
+
+ public void testRunDetectsStoppedSession() throws Exception {
+ con.close();
+ pool.expects(once()).method("removeFromPool").with(eq(serverSession));
+ serverSession.run();
+ pool.verify();
+}
}