You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/05/29 16:14:25 UTC
activemq git commit: [AMQ-6688] catch throwable on add task such that
result error result is not lost on failed async send, fix and test
Repository: activemq
Updated Branches:
refs/heads/master 00ee9491d -> f4c11f73b
[AMQ-6688] catch throwable on add task such that result error result is not lost on failed async send, fix and test
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f4c11f73
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f4c11f73
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f4c11f73
Branch: refs/heads/master
Commit: f4c11f73bba4440fd976aa5a3db3b7ee119d4712
Parents: 00ee949
Author: gtully <ga...@gmail.com>
Authored: Mon May 29 17:14:05 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 29 17:14:05 2017 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/KahaDBStore.java | 12 +-
.../store/kahadb/ErrorOnFutureSendTest.java | 123 +++++++++++++++++++
2 files changed, 130 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f4c11f73/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 2bf80a0..b5c3abd 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1433,8 +1433,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
- } catch (Exception e) {
- this.future.setException(e);
+ } catch (Throwable t) {
+ this.future.setException(t);
+ removeQueueTask(this.store, this.message.getMessageId());
}
}
@@ -1450,7 +1451,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
super(runnable, null);
}
- public void setException(final Exception e) {
+ public void setException(final Throwable e) {
super.setException(e);
}
@@ -1551,8 +1552,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
- } catch (Exception e) {
- this.future.setException(e);
+ } catch (Throwable t) {
+ this.future.setException(t);
+ removeTopicTask(this.topicStore, this.message.getMessageId());
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f4c11f73/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
new file mode 100644
index 0000000..8f0a289
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+
+public class ErrorOnFutureSendTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ErrorOnFutureSendTest.class);
+
+ @Rule
+ public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
+
+ private BrokerService broker = null;
+ private final ActiveMQQueue destination = new ActiveMQQueue("Test");
+ private KahaDBPersistenceAdapter adapter;
+
+ protected void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(true);
+ broker.setUseJmx(false);
+ broker.setAdvisorySupport(false);
+ broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+ adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ broker.start();
+ LOG.info("Starting broker..");
+ }
+
+ @Before
+ public void start() throws Exception {
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+
+ @Test(timeout = 30000)
+ public void testSendErrorBubblesBackFromStoreTask() throws Exception {
+
+ adapter.setTransactionIdTransformer(new TransactionIdTransformer() {
+ @Override
+ public TransactionId transform(TransactionId txid) {
+ throw new RuntimeException("Bla");
+ }
+ });
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+ connectionFactory.setWatchTopicAdvisories(false);
+ ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createMessage();
+
+ try {
+ producer.send(message);
+ fail("Expect exaception");
+ } catch (JMSException expected) {
+ expected.printStackTrace();
+ }
+
+ adapter.setTransactionIdTransformer(new TransactionIdTransformer() {
+ @Override
+ public TransactionId transform(TransactionId txid) {
+ throw new java.lang.OutOfMemoryError("Bla");
+ }
+ });
+
+ try {
+ producer.send(message);
+ fail("Expect exaception");
+ } catch (JMSException expected) {
+ expected.printStackTrace();
+ }
+
+
+ connection.close();
+ }
+}
\ No newline at end of file