You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/15 19:17:07 UTC

svn commit: r954972 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-ra/src/test/java/org/apache/activemq/ra/

Author: gtully
Date: Tue Jun 15 17:17:07 2010
New Revision: 954972

URL: http://svn.apache.org/viewvc?rev=954972&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2765 - rework fix for https://issues.apache.org/activemq/browse/AMQ-2772 - each consumer needs to indicate when interuption processing is complete. till there is a need to do other wise, the connection consumers (used by the RA managed connection) immediatly allow redelivery

Added:
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=954972&r1=954971&r2=954972&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jun 15 17:17:07 2010
@@ -1853,7 +1853,7 @@ public class ActiveMQConnection implemen
 	}
 
     public void transportInterupted() {
-        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0) - connectionConsumers.size());
+        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
         if (LOG.isDebugEnabled()) {
             LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
         }
@@ -1861,6 +1861,11 @@ public class ActiveMQConnection implemen
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
         }
+        
+        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
+            connectionConsumer.clearMessagesInProgress();    
+        }
+        
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = iter.next();
             listener.transportInterupted();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java?rev=954972&r1=954971&r2=954972&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java Tue Jun 15 17:17:07 2010
@@ -153,4 +153,11 @@ public class ActiveMQConnectionConsumer 
     public String toString() {
         return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
     }
+
+    public void clearMessagesInProgress() {
+        // future: may want to deal with rollback of in progress messages to track re deliveries
+        // before indicating that all is complete.        
+        // Till there is a need, lets immediately allow dispatch
+        this.connection.transportInterruptionProcessingComplete();
+    }
 }

Added: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java?rev=954972&view=auto
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java (added)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java Tue Jun 15 17:17:07 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class FailoverManagedClusterTest extends TestCase {
+
+    long txGenerator = System.currentTimeMillis();
+    
+    private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
+    private static final String SLAVE_BIND_ADDRESS = "tcp://0.0.0.0:61617";
+
+    private static final String BROKER_URL = "failover://(" + MASTER_BIND_ADDRESS + "," + SLAVE_BIND_ADDRESS + ")?randomize=false";
+    
+    private BrokerService master;
+    private BrokerService slave;
+
+    protected void setUp() throws Exception {
+        createAndStartMaster();
+        createAndStartSlave();    
+    }
+    
+
+    private void createAndStartMaster() throws Exception {
+        master = new BrokerService();
+        master.setDeleteAllMessagesOnStartup(true);
+        master.setUseJmx(false);
+        master.setBrokerName("BROKER");
+        master.addConnector(MASTER_BIND_ADDRESS);
+        master.start();
+        master.waitUntilStarted();
+    }
+
+    private void createAndStartSlave() throws Exception {
+        slave = new BrokerService();
+        slave.setUseJmx(false);
+        slave.setBrokerName("BROKER");
+        slave.addConnector(SLAVE_BIND_ADDRESS);
+
+        // Start the slave asynchronously, since this will block
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    slave.start();
+                    System.out.println("slave has started");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+        }).start();
+    }
+
+    public void testFailover() throws Exception {
+        
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl(BROKER_URL);
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(1);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+            public void onMessage(Message message) {
+                System.out.println("Received message " + message);
+                super.onMessage(message);
+                messageDelivered.countDown();
+            };
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+                return true;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+        // Give endpoint a moment to setup and register its listeners
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+        }
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+
+        // force a failover
+        master.stop();
+        slave.waitUntilStarted();
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException ie) {
+            // ignore
+        }
+
+        producer.send(session.createTextMessage("Hello, again!"));
+
+        // Wait for the message to be delivered.
+        assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));                
+    }
+
+
+    private static final class StubBootstrapContext implements BootstrapContext {
+        public WorkManager getWorkManager() {
+            return new WorkManager() {
+                public void doWork(Work work) throws WorkException {
+                    new Thread(work).start();
+                }
+
+                public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+                    new Thread(work).start();
+                }
+
+                public long startWork(Work work) throws WorkException {
+                    new Thread(work).start();
+                    return 0;
+                }
+
+                public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+                    new Thread(work).start();
+                    return 0;
+                }
+
+                public void scheduleWork(Work work) throws WorkException {
+                    new Thread(work).start();
+                }
+
+                public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
+                    new Thread(work).start();
+                }
+            };
+        }
+
+        public XATerminator getXATerminator() {
+            return null;
+        }
+
+        public Timer createTimer() throws UnavailableException {
+            return null;
+        }
+    }
+
+    public class StubMessageEndpoint implements MessageEndpoint, MessageListener {
+        public int messageCount;
+        public XAResource xaresource;
+        public Xid xid;
+
+        public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
+            try {
+                if (xid == null) {
+                    xid = createXid();
+                }
+                xaresource.start(xid, 0);
+            } catch (Throwable e) {
+                throw new ResourceException(e);
+            }
+        }
+
+        public void afterDelivery() throws ResourceException {
+            try {
+                xaresource.end(xid, XAResource.TMSUCCESS);
+                xaresource.prepare(xid);
+                xaresource.commit(xid, false);
+            } catch (Throwable e) {
+                e.printStackTrace();
+                throw new ResourceException(e);
+            }
+        }
+
+        public void release() {
+        }
+
+        public void onMessage(Message message) {
+            messageCount++;
+        }
+
+    }
+    
+    public Xid createXid() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+    }
+
+}

Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date