You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/11/27 13:49:42 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4904 - ensure transports are not restarted without a store lock

Updated Branches:
  refs/heads/trunk 927be0ce7 -> 7d98b3205


https://issues.apache.org/jira/browse/AMQ-4904 - ensure transports are not restarted without a store lock


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7d98b320
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7d98b320
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7d98b320

Branch: refs/heads/trunk
Commit: 7d98b320509ab460e337e54afd8043297164777c
Parents: 927be0c
Author: gtully <ga...@gmail.com>
Authored: Wed Nov 27 12:48:16 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Nov 27 12:49:18 2013 +0000

----------------------------------------------------------------------
 .../util/DefaultIOExceptionHandler.java         |   8 +-
 .../store/jdbc/JDBCIOExceptionHandler.java      |   4 +
 .../jdbc/JDBCIOExceptionHandlerMockeryTest.java | 108 +++++++++++++++++++
 3 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index ab35800..c65ec65 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -94,10 +94,12 @@ import org.slf4j.LoggerFactory;
                                             LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
                                             TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
                                         }
-                                        broker.startAllConnectors();
-                                        LOG.info("Successfully restarted transports on " + broker);
+                                        if (hasLockOwnership()) {
+                                            broker.startAllConnectors();
+                                            LOG.info("Successfully restarted transports on " + broker);
+                                        }
                                     } catch (Exception e) {
-                                        LOG.warn("Stopping " + broker + " due to failure while restarting transports", e);
+                                        LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
                                         stopBroker(e);
                                     } finally {
                                         handlingException.compareAndSet(true, false);

http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
index a92856c..d0ea276 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
@@ -21,11 +21,14 @@ import java.io.IOException;
 import org.apache.activemq.broker.Locker;
 import org.apache.activemq.broker.SuppressReplyException;
 import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
  */
 public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandler.class);
 
     public JDBCIOExceptionHandler() {
         setIgnoreSQLExceptions(false);
@@ -49,6 +52,7 @@ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
                 }
 
                 if (!hasLock) {
+                    LOG.warn("Lock keepAlive failed, no longer lock owner with: {}", locker);
                     throw new IOException("Lock keepAlive failed, no longer lock owner with: " + locker);
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
new file mode 100644
index 0000000..dd5d506
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.SuppressReplyException;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.Wait;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.States;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JDBCIOExceptionHandlerMockeryTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class);
+    private HashMap<Thread, Throwable> exceptions = new HashMap<Thread, Throwable>();
+
+    @Test
+    public void testShutdownWithoutTransportRestart() throws Exception {
+
+        Mockery context = new Mockery() {{
+            setImposteriser(ClassImposteriser.INSTANCE);
+        }};
+
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.error("unexpected exception {} on thread {}", e, t);
+                exceptions.put(t, e);
+            }
+        });
+
+        final BrokerService brokerService = context.mock(BrokerService.class);
+        final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
+        final Locker locker = context.mock(Locker.class);
+
+        final States jdbcConn = context.states("jdbc").startsAs("down");
+        final States broker = context.states("broker").startsAs("started");
+
+        // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify
+        context.checking(new Expectations() {{
+            allowing(brokerService).isRestartAllowed();
+            will(returnValue(false));
+            allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class)));
+            allowing(brokerService).getPersistenceAdapter();
+            will(returnValue(jdbcPersistenceAdapter));
+            allowing(jdbcPersistenceAdapter).getLocker();
+            will(returnValue(locker));
+            allowing(locker).keepAlive();
+            when(jdbcConn.is("down"));
+            will(returnValue(true));
+            allowing(locker).keepAlive();
+            when(jdbcConn.is("up"));
+            will(returnValue(false));
+
+            allowing(jdbcPersistenceAdapter).checkpoint(with(true));
+            then(jdbcConn.is("up"));
+            allowing(brokerService).stop();
+            then(broker.is("stopped"));
+
+        }});
+
+        JDBCIOExceptionHandler underTest = new JDBCIOExceptionHandler();
+        underTest.setBrokerService(brokerService);
+
+        try {
+            underTest.handle(new IOException());
+            fail("except suppress reply ex");
+        } catch (SuppressReplyException expected) {
+        }
+
+        assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("broker state {}", broker);
+                return broker.is("stopped").isActive();
+            }
+        }));
+        context.assertIsSatisfied();
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+}