You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/15 19:17:07 UTC
svn commit: r954972 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-ra/src/test/java/org/apache/activemq/ra/
Author: gtully
Date: Tue Jun 15 17:17:07 2010
New Revision: 954972
URL: http://svn.apache.org/viewvc?rev=954972&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2765 - rework fix for https://issues.apache.org/activemq/browse/AMQ-2772 - each consumer needs to indicate when interuption processing is complete. till there is a need to do other wise, the connection consumers (used by the RA managed connection) immediatly allow redelivery
Added:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=954972&r1=954971&r2=954972&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jun 15 17:17:07 2010
@@ -1853,7 +1853,7 @@ public class ActiveMQConnection implemen
}
public void transportInterupted() {
- this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0) - connectionConsumers.size());
+ this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
}
@@ -1861,6 +1861,11 @@ public class ActiveMQConnection implemen
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
+
+ for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
+ connectionConsumer.clearMessagesInProgress();
+ }
+
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java?rev=954972&r1=954971&r2=954972&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java Tue Jun 15 17:17:07 2010
@@ -153,4 +153,11 @@ public class ActiveMQConnectionConsumer
public String toString() {
return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
}
+
+ public void clearMessagesInProgress() {
+ // future: may want to deal with rollback of in progress messages to track re deliveries
+ // before indicating that all is complete.
+ // Till there is a need, lets immediately allow dispatch
+ this.connection.transportInterruptionProcessingComplete();
+ }
}
Added: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java?rev=954972&view=auto
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java (added)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java Tue Jun 15 17:17:07 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class FailoverManagedClusterTest extends TestCase {
+
+ long txGenerator = System.currentTimeMillis();
+
+ private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
+ private static final String SLAVE_BIND_ADDRESS = "tcp://0.0.0.0:61617";
+
+ private static final String BROKER_URL = "failover://(" + MASTER_BIND_ADDRESS + "," + SLAVE_BIND_ADDRESS + ")?randomize=false";
+
+ private BrokerService master;
+ private BrokerService slave;
+
+ protected void setUp() throws Exception {
+ createAndStartMaster();
+ createAndStartSlave();
+ }
+
+
+ private void createAndStartMaster() throws Exception {
+ master = new BrokerService();
+ master.setDeleteAllMessagesOnStartup(true);
+ master.setUseJmx(false);
+ master.setBrokerName("BROKER");
+ master.addConnector(MASTER_BIND_ADDRESS);
+ master.start();
+ master.waitUntilStarted();
+ }
+
+ private void createAndStartSlave() throws Exception {
+ slave = new BrokerService();
+ slave.setUseJmx(false);
+ slave.setBrokerName("BROKER");
+ slave.addConnector(SLAVE_BIND_ADDRESS);
+
+ // Start the slave asynchronously, since this will block
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ slave.start();
+ System.out.println("slave has started");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }).start();
+ }
+
+ public void testFailover() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+ adapter.setServerUrl(BROKER_URL);
+ adapter.start(new StubBootstrapContext());
+
+ final CountDownLatch messageDelivered = new CountDownLatch(1);
+
+ final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+ public void onMessage(Message message) {
+ System.out.println("Received message " + message);
+ super.onMessage(message);
+ messageDelivered.countDown();
+ };
+ };
+
+ ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+ activationSpec.setDestinationType(Queue.class.getName());
+ activationSpec.setDestination("TEST");
+ activationSpec.setResourceAdapter(adapter);
+ activationSpec.validate();
+
+ MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+ public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+ endpoint.xaresource = resource;
+ return endpoint;
+ }
+
+ public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+ return true;
+ }
+ };
+
+ // Activate an Endpoint
+ adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+ // Give endpoint a moment to setup and register its listeners
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+
+ // Send the broker a message to that endpoint
+ MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+
+ // force a failover
+ master.stop();
+ slave.waitUntilStarted();
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+
+ producer.send(session.createTextMessage("Hello, again!"));
+
+ // Wait for the message to be delivered.
+ assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
+ }
+
+
+ private static final class StubBootstrapContext implements BootstrapContext {
+ public WorkManager getWorkManager() {
+ return new WorkManager() {
+ public void doWork(Work work) throws WorkException {
+ new Thread(work).start();
+ }
+
+ public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+ new Thread(work).start();
+ }
+
+ public long startWork(Work work) throws WorkException {
+ new Thread(work).start();
+ return 0;
+ }
+
+ public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+ new Thread(work).start();
+ return 0;
+ }
+
+ public void scheduleWork(Work work) throws WorkException {
+ new Thread(work).start();
+ }
+
+ public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+ new Thread(work).start();
+ }
+ };
+ }
+
+ public XATerminator getXATerminator() {
+ return null;
+ }
+
+ public Timer createTimer() throws UnavailableException {
+ return null;
+ }
+ }
+
+ public class StubMessageEndpoint implements MessageEndpoint, MessageListener {
+ public int messageCount;
+ public XAResource xaresource;
+ public Xid xid;
+
+ public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
+ try {
+ if (xid == null) {
+ xid = createXid();
+ }
+ xaresource.start(xid, 0);
+ } catch (Throwable e) {
+ throw new ResourceException(e);
+ }
+ }
+
+ public void afterDelivery() throws ResourceException {
+ try {
+ xaresource.end(xid, XAResource.TMSUCCESS);
+ xaresource.prepare(xid);
+ xaresource.commit(xid, false);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new ResourceException(e);
+ }
+ }
+
+ public void release() {
+ }
+
+ public void onMessage(Message message) {
+ messageCount++;
+ }
+
+ }
+
+ public Xid createXid() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 86;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+ }
+
+}
Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date