You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/06/17 20:03:31 UTC

[2/3] camel git commit: Move all Hazelcast instance interaction inside try-catch, and sleep 100 ms when exception occurred to avoid spamming log with errors e.g. when Hazelcast instance is unreachable for a period of time.

Move all Hazelcast instance interaction inside try-catch, and sleep 100 ms when exception occurred to avoid spamming log with errors e.g. when Hazelcast instance is unreachable for a period of time.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12d5182c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12d5182c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12d5182c

Branch: refs/heads/master
Commit: 12d5182c5ab93f6e364ad0539ed7e3792bfd02c5
Parents: 1dd8bf3
Author: Ann Katrin Gagnat <an...@gmail.com>
Authored: Thu Jun 15 00:08:53 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jun 17 21:57:14 2017 +0200

----------------------------------------------------------------------
 .../hazelcast/seda/HazelcastSedaConsumer.java   | 26 +++++---
 ...daRecoverableConsumerNewTransactionTest.java | 41 ++++++++++++
 ...castSedaRecoverableConsumerRollbackTest.java | 47 ++++++++++++++
 .../HazelcastSedaRecoverableConsumerTest.java   | 67 ++++++++++++++++++++
 4 files changed, 172 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index 4dd99a1..54bb208 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -77,16 +77,17 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
             final Exchange exchange = this.getEndpoint().createExchange();
 
             TransactionContext transactionCtx = null;
-            if (endpoint.getConfiguration().isTransacted()) {
-                // Get and begin transaction if exist
-                transactionCtx = endpoint.getHazelcastInstance().newTransactionContext();
+            try {
+                if (endpoint.getConfiguration().isTransacted()) {
+                    // Get and begin transaction if exist
+                    transactionCtx = endpoint.getHazelcastInstance().newTransactionContext();
 
-                if (transactionCtx != null) {
-                    log.trace("Begin transaction: {}", transactionCtx.getTxnId());
-                    transactionCtx.beginTransaction();
+                    if (transactionCtx != null) {
+                        log.trace("Begin transaction: {}", transactionCtx.getTxnId());
+                        transactionCtx.beginTransaction();
+                    }
                 }
-            }
-            try {
+
                 final Object body = queue.poll(endpoint.getConfiguration().getPollTimeout(), TimeUnit.MILLISECONDS);
 
                 if (body != null) {
@@ -134,9 +135,16 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
                 // Rollback
                 if (transactionCtx != null) {
                     log.trace("Rollback transaction: {}", transactionCtx.getTxnId());
-                    transactionCtx.rollbackTransaction();
+                    try {
+                        transactionCtx.rollbackTransaction();
+                    } catch (Throwable ignore) {
+                    }
                 }
                 getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignore) {
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
new file mode 100644
index 0000000..d9f5eec
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.HazelcastException;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.transaction.TransactionContext;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.*;
+
+public class HazelcastSedaRecoverableConsumerNewTransactionTest extends HazelcastSedaRecoverableConsumerTest {
+
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        TransactionContext transactionContext = Mockito.mock(TransactionContext.class);
+        when(hazelcastInstance.newTransactionContext())
+                .thenThrow(new HazelcastException("Could not obtain Connection!!!"))
+                .thenReturn(transactionContext);
+        when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+    }
+
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance).getQueue("foo");
+        verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
new file mode 100644
index 0000000..43d0bcc
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.HazelcastException;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.transaction.TransactionContext;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.*;
+
+
+public class HazelcastSedaRecoverableConsumerRollbackTest extends HazelcastSedaRecoverableConsumerTest {
+
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        TransactionContext transactionContext = Mockito.mock(TransactionContext.class);
+        HazelcastException hazelcastException = new HazelcastException("Could not obtain Connection!!!");
+        doThrow(hazelcastException)
+                .doNothing()
+                .when(transactionContext).beginTransaction();
+        doThrow(hazelcastException)
+                .doNothing()
+                .when(transactionContext).rollbackTransaction();
+        when(hazelcastInstance.newTransactionContext()).thenReturn(transactionContext);
+        when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+    }
+
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance).getQueue("foo");
+        verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
new file mode 100644
index 0000000..45b6f89
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.IQueue;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public abstract class HazelcastSedaRecoverableConsumerTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    protected IQueue<Object> queue;
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint mock;
+
+    @Test
+    public void testRecovery() throws InterruptedException {
+        when(queue.poll(any(Long.class), any(TimeUnit.class)))
+                .thenReturn("bar")
+                .thenReturn(null);
+
+        mock.expectedMessageCount(1);
+
+        assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("hazelcast-seda:foo?transacted=true").to("mock:result");
+            }
+        };
+    }
+
+    @After
+    public final void stopContext() throws Exception {
+        context.stop();
+    }
+
+}