You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/05/29 16:14:25 UTC

activemq git commit: [AMQ-6688] catch throwable on add task such that result error result is not lost on failed async send, fix and test

Repository: activemq
Updated Branches:
  refs/heads/master 00ee9491d -> f4c11f73b


[AMQ-6688] catch throwable on add task such that result error result is not lost on failed async send, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f4c11f73
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f4c11f73
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f4c11f73

Branch: refs/heads/master
Commit: f4c11f73bba4440fd976aa5a3db3b7ee119d4712
Parents: 00ee949
Author: gtully <ga...@gmail.com>
Authored: Mon May 29 17:14:05 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 29 17:14:05 2017 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      |  12 +-
 .../store/kahadb/ErrorOnFutureSendTest.java     | 123 +++++++++++++++++++
 2 files changed, 130 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f4c11f73/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 2bf80a0..b5c3abd 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1433,8 +1433,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             + (this.store.canceledTasks / this.store.doneTasks) * 100);
                     this.store.canceledTasks = this.store.doneTasks = 0;
                 }
-            } catch (Exception e) {
-                this.future.setException(e);
+            } catch (Throwable t) {
+                this.future.setException(t);
+                removeQueueTask(this.store, this.message.getMessageId());
             }
         }
 
@@ -1450,7 +1451,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                 super(runnable, null);
             }
 
-            public void setException(final Exception e) {
+            public void setException(final Throwable e) {
                 super.setException(e);
             }
 
@@ -1551,8 +1552,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             + (this.store.canceledTasks / this.store.doneTasks) * 100);
                     this.store.canceledTasks = this.store.doneTasks = 0;
                 }
-            } catch (Exception e) {
-                this.future.setException(e);
+            } catch (Throwable t) {
+                this.future.setException(t);
+                removeTopicTask(this.topicStore, this.message.getMessageId());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f4c11f73/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
new file mode 100644
index 0000000..8f0a289
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+
+public class ErrorOnFutureSendTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ErrorOnFutureSendTest.class);
+
+    @Rule
+    public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
+
+    private BrokerService broker = null;
+    private final ActiveMQQueue destination = new ActiveMQQueue("Test");
+    private KahaDBPersistenceAdapter adapter;
+
+    protected void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+        adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    @Before
+    public void start() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+
+    @Test(timeout = 30000)
+    public void testSendErrorBubblesBackFromStoreTask() throws Exception {
+
+        adapter.setTransactionIdTransformer(new TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                throw new RuntimeException("Bla");
+            }
+        });
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        connectionFactory.setWatchTopicAdvisories(false);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(destination);
+        Message message = session.createMessage();
+
+        try {
+            producer.send(message);
+            fail("Expect exaception");
+        } catch (JMSException expected) {
+            expected.printStackTrace();
+        }
+
+        adapter.setTransactionIdTransformer(new TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                throw new java.lang.OutOfMemoryError("Bla");
+            }
+        });
+
+        try {
+            producer.send(message);
+            fail("Expect exaception");
+        } catch (JMSException expected) {
+            expected.printStackTrace();
+        }
+
+
+        connection.close();
+    }
+}
\ No newline at end of file