You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/11/27 13:49:42 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4904 - ensure
transports are not restarted without a store lock
Updated Branches:
refs/heads/trunk 927be0ce7 -> 7d98b3205
https://issues.apache.org/jira/browse/AMQ-4904 - ensure transports are not restarted without a store lock
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7d98b320
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7d98b320
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7d98b320
Branch: refs/heads/trunk
Commit: 7d98b320509ab460e337e54afd8043297164777c
Parents: 927be0c
Author: gtully <ga...@gmail.com>
Authored: Wed Nov 27 12:48:16 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Nov 27 12:49:18 2013 +0000
----------------------------------------------------------------------
.../util/DefaultIOExceptionHandler.java | 8 +-
.../store/jdbc/JDBCIOExceptionHandler.java | 4 +
.../jdbc/JDBCIOExceptionHandlerMockeryTest.java | 108 +++++++++++++++++++
3 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index ab35800..c65ec65 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -94,10 +94,12 @@ import org.slf4j.LoggerFactory;
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
- broker.startAllConnectors();
- LOG.info("Successfully restarted transports on " + broker);
+ if (hasLockOwnership()) {
+ broker.startAllConnectors();
+ LOG.info("Successfully restarted transports on " + broker);
+ }
} catch (Exception e) {
- LOG.warn("Stopping " + broker + " due to failure while restarting transports", e);
+ LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
stopBroker(e);
} finally {
handlingException.compareAndSet(true, false);
http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
index a92856c..d0ea276 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
@@ -21,11 +21,14 @@ import java.io.IOException;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.SuppressReplyException;
import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandler.class);
public JDBCIOExceptionHandler() {
setIgnoreSQLExceptions(false);
@@ -49,6 +52,7 @@ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
}
if (!hasLock) {
+ LOG.warn("Lock keepAlive failed, no longer lock owner with: {}", locker);
throw new IOException("Lock keepAlive failed, no longer lock owner with: " + locker);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7d98b320/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
new file mode 100644
index 0000000..dd5d506
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.SuppressReplyException;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.Wait;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.States;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JDBCIOExceptionHandlerMockeryTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class);
+ private HashMap<Thread, Throwable> exceptions = new HashMap<Thread, Throwable>();
+
+ @Test
+ public void testShutdownWithoutTransportRestart() throws Exception {
+
+ Mockery context = new Mockery() {{
+ setImposteriser(ClassImposteriser.INSTANCE);
+ }};
+
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("unexpected exception {} on thread {}", e, t);
+ exceptions.put(t, e);
+ }
+ });
+
+ final BrokerService brokerService = context.mock(BrokerService.class);
+ final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
+ final Locker locker = context.mock(Locker.class);
+
+ final States jdbcConn = context.states("jdbc").startsAs("down");
+ final States broker = context.states("broker").startsAs("started");
+
+ // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify
+ context.checking(new Expectations() {{
+ allowing(brokerService).isRestartAllowed();
+ will(returnValue(false));
+ allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class)));
+ allowing(brokerService).getPersistenceAdapter();
+ will(returnValue(jdbcPersistenceAdapter));
+ allowing(jdbcPersistenceAdapter).getLocker();
+ will(returnValue(locker));
+ allowing(locker).keepAlive();
+ when(jdbcConn.is("down"));
+ will(returnValue(true));
+ allowing(locker).keepAlive();
+ when(jdbcConn.is("up"));
+ will(returnValue(false));
+
+ allowing(jdbcPersistenceAdapter).checkpoint(with(true));
+ then(jdbcConn.is("up"));
+ allowing(brokerService).stop();
+ then(broker.is("stopped"));
+
+ }});
+
+ JDBCIOExceptionHandler underTest = new JDBCIOExceptionHandler();
+ underTest.setBrokerService(brokerService);
+
+ try {
+ underTest.handle(new IOException());
+ fail("except suppress reply ex");
+ } catch (SuppressReplyException expected) {
+ }
+
+ assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("broker state {}", broker);
+ return broker.is("stopped").isActive();
+ }
+ }));
+ context.assertIsSatisfied();
+
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
+}