You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/06/17 20:03:31 UTC
[2/3] camel git commit: Move all Hazelcast instance interaction
inside try-catch,
and sleep 100 ms when exception occurred to avoid spamming log with errors
e.g. when Hazelcast instance is unreachable for a period of time.
Move all Hazelcast instance interaction inside try-catch, and sleep 100 ms when exception occurred to avoid spamming log with errors e.g. when Hazelcast instance is unreachable for a period of time.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12d5182c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12d5182c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12d5182c
Branch: refs/heads/master
Commit: 12d5182c5ab93f6e364ad0539ed7e3792bfd02c5
Parents: 1dd8bf3
Author: Ann Katrin Gagnat <an...@gmail.com>
Authored: Thu Jun 15 00:08:53 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jun 17 21:57:14 2017 +0200
----------------------------------------------------------------------
.../hazelcast/seda/HazelcastSedaConsumer.java | 26 +++++---
...daRecoverableConsumerNewTransactionTest.java | 41 ++++++++++++
...castSedaRecoverableConsumerRollbackTest.java | 47 ++++++++++++++
.../HazelcastSedaRecoverableConsumerTest.java | 67 ++++++++++++++++++++
4 files changed, 172 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index 4dd99a1..54bb208 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -77,16 +77,17 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
final Exchange exchange = this.getEndpoint().createExchange();
TransactionContext transactionCtx = null;
- if (endpoint.getConfiguration().isTransacted()) {
- // Get and begin transaction if exist
- transactionCtx = endpoint.getHazelcastInstance().newTransactionContext();
+ try {
+ if (endpoint.getConfiguration().isTransacted()) {
+ // Get and begin transaction if exist
+ transactionCtx = endpoint.getHazelcastInstance().newTransactionContext();
- if (transactionCtx != null) {
- log.trace("Begin transaction: {}", transactionCtx.getTxnId());
- transactionCtx.beginTransaction();
+ if (transactionCtx != null) {
+ log.trace("Begin transaction: {}", transactionCtx.getTxnId());
+ transactionCtx.beginTransaction();
+ }
}
- }
- try {
+
final Object body = queue.poll(endpoint.getConfiguration().getPollTimeout(), TimeUnit.MILLISECONDS);
if (body != null) {
@@ -134,9 +135,16 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
// Rollback
if (transactionCtx != null) {
log.trace("Rollback transaction: {}", transactionCtx.getTxnId());
- transactionCtx.rollbackTransaction();
+ try {
+ transactionCtx.rollbackTransaction();
+ } catch (Throwable ignore) {
+ }
}
getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignore) {
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
new file mode 100644
index 0000000..d9f5eec
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerNewTransactionTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.HazelcastException;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.transaction.TransactionContext;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.*;
+
+public class HazelcastSedaRecoverableConsumerNewTransactionTest extends HazelcastSedaRecoverableConsumerTest {
+
+ protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ TransactionContext transactionContext = Mockito.mock(TransactionContext.class);
+ when(hazelcastInstance.newTransactionContext())
+ .thenThrow(new HazelcastException("Could not obtain Connection!!!"))
+ .thenReturn(transactionContext);
+ when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+ }
+
+ protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ verify(hazelcastInstance).getQueue("foo");
+ verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
new file mode 100644
index 0000000..43d0bcc
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerRollbackTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.HazelcastException;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.transaction.TransactionContext;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.*;
+
+
+public class HazelcastSedaRecoverableConsumerRollbackTest extends HazelcastSedaRecoverableConsumerTest {
+
+ protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ TransactionContext transactionContext = Mockito.mock(TransactionContext.class);
+ HazelcastException hazelcastException = new HazelcastException("Could not obtain Connection!!!");
+ doThrow(hazelcastException)
+ .doNothing()
+ .when(transactionContext).beginTransaction();
+ doThrow(hazelcastException)
+ .doNothing()
+ .when(transactionContext).rollbackTransaction();
+ when(hazelcastInstance.newTransactionContext()).thenReturn(transactionContext);
+ when(hazelcastInstance.getQueue("foo")).thenReturn(queue);
+ }
+
+ protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ verify(hazelcastInstance).getQueue("foo");
+ verify(hazelcastInstance, atLeastOnce()).newTransactionContext();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/12d5182c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
new file mode 100644
index 0000000..45b6f89
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaRecoverableConsumerTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.IQueue;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public abstract class HazelcastSedaRecoverableConsumerTest extends HazelcastCamelTestSupport {
+
+ @Mock
+ protected IQueue<Object> queue;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mock;
+
+ @Test
+ public void testRecovery() throws InterruptedException {
+ when(queue.poll(any(Long.class), any(TimeUnit.class)))
+ .thenReturn("bar")
+ .thenReturn(null);
+
+ mock.expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("hazelcast-seda:foo?transacted=true").to("mock:result");
+ }
+ };
+ }
+
+ @After
+ public final void stopContext() throws Exception {
+ context.stop();
+ }
+
+}