You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/02 07:17:33 UTC

[camel] branch bean-backport created (now b840259d83a)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch bean-backport
in repository https://gitbox.apache.org/repos/asf/camel.git


      at b840259d83a CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950)

This branch includes the following new commits:

     new b840259d83a CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch bean-backport
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b840259d83a38807a89ff46c5d7b36b9f43b3f69
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 2 08:46:52 2023 +0200

    CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950)
---
 .../org/apache/camel/component/bean/BeanInfo.java  |  11 +-
 .../camel/component/bean/BeanInfoCacheKey.java     |  15 ++-
 .../camel/component/bean/ConstantBeanHolder.java   |   2 +-
 .../bean/DefaultBeanProcessorFactory.java          |   3 +-
 .../DynamicRouterConcurrentEIPManualTest.java      | 128 +++++++++++++++++++++
 .../DynamicRouterConcurrentPOJOManualTest.java     | 127 ++++++++++++++++++++
 .../DynamicRouterConcurrentPOJOTest.java           |  82 -------------
 7 files changed, 274 insertions(+), 94 deletions(-)

diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java
index 24e2e9c6ce5..f53439b6b75 100644
--- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java
+++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java
@@ -73,6 +73,7 @@ public class BeanInfo {
     private final CamelContext camelContext;
     private final BeanComponent component;
     private final Class<?> type;
+    private final Object instance;
     private final ParameterMappingStrategy strategy;
     private final MethodInfo defaultMethod;
     // shared state with details of operations introspected from the bean, created during the constructor
@@ -92,22 +93,24 @@ public class BeanInfo {
 
     public BeanInfo(CamelContext camelContext, Method explicitMethod, ParameterMappingStrategy parameterMappingStrategy,
                     BeanComponent beanComponent) {
-        this(camelContext, explicitMethod.getDeclaringClass(), explicitMethod, parameterMappingStrategy, beanComponent);
+        this(camelContext, explicitMethod.getDeclaringClass(), null, explicitMethod, parameterMappingStrategy, beanComponent);
     }
 
     public BeanInfo(CamelContext camelContext, Class<?> type, ParameterMappingStrategy strategy, BeanComponent beanComponent) {
-        this(camelContext, type, null, strategy, beanComponent);
+        this(camelContext, type, null, null, strategy, beanComponent);
     }
 
-    public BeanInfo(CamelContext camelContext, Class<?> type, Method explicitMethod, ParameterMappingStrategy strategy,
+    public BeanInfo(CamelContext camelContext, Class<?> type, Object instance, Method explicitMethod,
+                    ParameterMappingStrategy strategy,
                     BeanComponent beanComponent) {
 
         this.camelContext = camelContext;
         this.type = type;
+        this.instance = instance;
         this.strategy = strategy;
         this.component = beanComponent;
 
-        final BeanInfoCacheKey key = new BeanInfoCacheKey(type, explicitMethod);
+        final BeanInfoCacheKey key = new BeanInfoCacheKey(type, instance, explicitMethod);
 
         // lookup if we have a bean info cache
         BeanInfo beanInfo = component.getBeanInfoFromCache(key);
diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java
index f29438189ff..3edabbdfb05 100644
--- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java
+++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.bean;
 
 import java.lang.reflect.Method;
+import java.util.Objects;
 
 /**
  * A key used for caching {@link BeanInfo} by the {@link BeanComponent}
@@ -24,10 +25,12 @@ import java.lang.reflect.Method;
 public final class BeanInfoCacheKey {
 
     private final Class<?> type;
+    private final Object instance;
     private final Method explicitMethod;
 
-    public BeanInfoCacheKey(Class<?> type, Method explicitMethod) {
+    public BeanInfoCacheKey(Class<?> type, Object instance, Method explicitMethod) {
         this.type = type;
+        this.instance = instance;
         this.explicitMethod = explicitMethod;
     }
 
@@ -42,19 +45,19 @@ public final class BeanInfoCacheKey {
 
         BeanInfoCacheKey that = (BeanInfoCacheKey) o;
 
-        if (explicitMethod != null ? !explicitMethod.equals(that.explicitMethod) : that.explicitMethod != null) {
+        if (!Objects.equals(type, that.type)) {
             return false;
         }
-        if (!type.equals(that.type)) {
+        if (!Objects.equals(instance, that.instance)) {
             return false;
         }
-
-        return true;
+        return Objects.equals(explicitMethod, that.explicitMethod);
     }
 
     @Override
     public int hashCode() {
-        int result = type.hashCode();
+        int result = type != null ? type.hashCode() : 0;
+        result = 31 * result + (instance != null ? instance.hashCode() : 0);
         result = 31 * result + (explicitMethod != null ? explicitMethod.hashCode() : 0);
         return result;
     }
diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java
index 31b00171a17..dd9fd2ec615 100644
--- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java
+++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java
@@ -47,7 +47,7 @@ public class ConstantBeanHolder implements BeanHolder {
         ObjectHelper.notNull(bean, "bean");
 
         this.bean = bean;
-        this.beanInfo = new BeanInfo(context, bean.getClass(), parameterMappingStrategy, beanComponent);
+        this.beanInfo = new BeanInfo(context, bean.getClass(), bean, null, parameterMappingStrategy, beanComponent);
     }
 
     @Override
diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java
index e3affca6948..9ff2ee574b6 100644
--- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java
+++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java
@@ -58,7 +58,8 @@ public final class DefaultBeanProcessorFactory extends ServiceSupport
 
     @Override
     public Processor createBeanProcessor(CamelContext camelContext, Object bean, Method method) throws Exception {
-        BeanInfo info = new BeanInfo(camelContext, method, parameterMappingStrategy, beanComponent);
+        BeanInfo info
+                = new BeanInfo(camelContext, method.getDeclaringClass(), bean, method, parameterMappingStrategy, beanComponent);
         return new BeanProcessor(bean, info);
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java
new file mode 100644
index 00000000000..653f5612283
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.dynamicrouter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
+
+/**
+ * {@link DynamicRouterConcurrentPOJOManualTest}
+ */
+@Disabled("Manual test together with DynamicRouterConcurrentPOJOTest")
+public class DynamicRouterConcurrentEIPManualTest extends ContextTestSupport {
+
+    private static final int COUNT = 100;
+
+    @RepeatedTest(100)
+    public void testConcurrentDynamicRouter() throws Exception {
+        final MockEndpoint mockA = getMockEndpoint("mock:a");
+        mockA.expectedMessageCount(COUNT);
+        final MockEndpoint mockB = getMockEndpoint("mock:b");
+        mockB.expectedMessageCount(COUNT);
+
+        Thread sendToSedaA = createSedaSenderThread("seda:a", context.createProducerTemplate());
+        Thread sendToSedaB = createSedaSenderThread("seda:b", context.createProducerTemplate());
+
+        sendToSedaA.start();
+        sendToSedaB.start();
+
+        sendToSedaA.join(10000);
+        sendToSedaB.join(10000);
+
+        /*
+         * Awaiting the sum of the two mocks to be 200 makes demonstrating CAMEL-19487
+         * a bit faster: the problem is that sometimes messages for mock:a land in mock:b or vice versa
+         * but the sum is always 200
+         */
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> mockA.getReceivedCounter() + mockB.getReceivedCounter() == 200);
+
+        /* Now that all messages were delivered, let's make sure that messages for mock:a did not land in mock:b or vice versa */
+        Assertions.assertThat(mockA.getReceivedExchanges())
+                .map(Exchange::getMessage)
+                .map(m -> m.getBody(String.class))
+                .filteredOn(body -> body.contains("Message from seda:b"))
+                .as(
+                        "Expected mock:a to contain only messages from seda:a, but there were also messages from seda:b")
+                .isEmpty();
+
+        Assertions.assertThat(mockB.getReceivedExchanges())
+                .map(Exchange::getMessage)
+                .map(m -> m.getBody(String.class))
+                .filteredOn(body -> body.contains("Message from seda:a"))
+                .as(
+                        "Expected mock:b to contain only messages from seda:b, but there were also messages from seda:a")
+                .isEmpty();
+
+        Assertions.assertThat(mockA.getReceivedCounter()).isEqualTo(100);
+        Assertions.assertThat(mockB.getReceivedCounter()).isEqualTo(100);
+
+    }
+
+    private Thread createSedaSenderThread(final String seda, final ProducerTemplate perThreadtemplate) {
+        return new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < COUNT; i++) {
+                    perThreadtemplate.sendBody(seda, "Message from " + seda + " " + i);
+                }
+            }
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                MyDynamicRouterPojo a = new MyDynamicRouterPojo("mock:a");
+                MyDynamicRouterPojo b = new MyDynamicRouterPojo("mock:b");
+
+                from("seda:a")
+                        .dynamicRouter(method(a, "route"));
+
+                from("seda:b")
+                        .dynamicRouter(method(b, "route"));
+            }
+        };
+    }
+
+    public static class MyDynamicRouterPojo {
+
+        private final String target;
+
+        public MyDynamicRouterPojo(String target) {
+            this.target = target;
+        }
+
+        public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) {
+            if (previous == null) {
+                return target;
+            } else {
+                return null;
+            }
+        }
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java
new file mode 100644
index 00000000000..a2eeeeb3f29
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.dynamicrouter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.DynamicRouter;
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
+
+/**
+ * {@link DynamicRouterConcurrentEIPManualTest}
+ */
+@Disabled("Manual test together with DynamicRouterConcurrentEIPTest")
+public class DynamicRouterConcurrentPOJOManualTest extends ContextTestSupport {
+
+    private static final int COUNT = 100;
+
+    @RepeatedTest(100)
+    public void testConcurrentDynamicRouter() throws Exception {
+        final MockEndpoint mockA = getMockEndpoint("mock:a");
+        mockA.expectedMessageCount(COUNT);
+        final MockEndpoint mockB = getMockEndpoint("mock:b");
+        mockB.expectedMessageCount(COUNT);
+
+        Thread sendToSedaA = createSedaSenderThread("seda:a", context.createProducerTemplate());
+        Thread sendToSedaB = createSedaSenderThread("seda:b", context.createProducerTemplate());
+
+        sendToSedaA.start();
+        sendToSedaB.start();
+
+        sendToSedaA.join(10000);
+        sendToSedaB.join(10000);
+
+        /*
+         * Awaiting the sum of the two mocks to be 200 makes demonstrating CAMEL-19487
+         * a bit faster: the problem is that sometimes messages for mock:a land in mock:b or vice versa
+         * but the sum is always 200
+         */
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> mockA.getReceivedCounter() + mockB.getReceivedCounter() == 200);
+
+        /* Now that all messages were delivered, let's make sure that messages for mock:a did not land in mock:b or vice versa */
+        Assertions.assertThat(mockA.getReceivedExchanges())
+                .map(Exchange::getMessage)
+                .map(m -> m.getBody(String.class))
+                .filteredOn(body -> body.contains("Message from seda:b"))
+                .as(
+                        "Expected mock:a to contain only messages from seda:a, but there were also messages from seda:b")
+                .isEmpty();
+
+        Assertions.assertThat(mockB.getReceivedExchanges())
+                .map(Exchange::getMessage)
+                .map(m -> m.getBody(String.class))
+                .filteredOn(body -> body.contains("Message from seda:a"))
+                .as(
+                        "Expected mock:b to contain only messages from seda:b, but there were also messages from seda:a")
+                .isEmpty();
+
+        Assertions.assertThat(mockA.getReceivedCounter()).isEqualTo(100);
+        Assertions.assertThat(mockB.getReceivedCounter()).isEqualTo(100);
+
+    }
+
+    private Thread createSedaSenderThread(final String seda, final ProducerTemplate perThreadtemplate) {
+        return new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < COUNT; i++) {
+                    perThreadtemplate.sendBody(seda, "Message from " + seda + " " + i);
+                }
+            }
+        });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("seda:a")
+                        .bean(new MyDynamicRouterPojo("mock:a"));
+
+                from("seda:b")
+                        .bean(new MyDynamicRouterPojo("mock:b"));
+            }
+        };
+    }
+
+    public static class MyDynamicRouterPojo {
+
+        private final String target;
+
+        public MyDynamicRouterPojo(String target) {
+            this.target = target;
+        }
+
+        @DynamicRouter
+        public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) {
+            if (previous == null) {
+                return target;
+            } else {
+                return null;
+            }
+        }
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
deleted file mode 100644
index 3849a08f30c..00000000000
--- a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.dynamicrouter;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.DynamicRouter;
-import org.apache.camel.Exchange;
-import org.apache.camel.Header;
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.jupiter.api.Test;
-
-public class DynamicRouterConcurrentPOJOTest extends ContextTestSupport {
-
-    private static final int COUNT = 100;
-
-    @Test
-    public void testConcurrentDynamicRouter() throws Exception {
-        getMockEndpoint("mock:a").expectedMessageCount(COUNT);
-        getMockEndpoint("mock:b").expectedMessageCount(COUNT);
-
-        Thread sendToSedaA = createSedaSenderThread("seda:a");
-        Thread sendToSedaB = createSedaSenderThread("seda:b");
-
-        sendToSedaA.start();
-        sendToSedaB.start();
-
-        assertMockEndpointsSatisfied();
-    }
-
-    private Thread createSedaSenderThread(final String seda) {
-        return new Thread(new Runnable() {
-            @Override
-            public void run() {
-                for (int i = 0; i < COUNT; i++) {
-                    template.sendBody(seda, "Message from " + seda);
-                }
-            }
-        });
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                from("seda:a").bean(new MyDynamicRouterPojo("mock:a"));
-                from("seda:b").bean(new MyDynamicRouterPojo("mock:b"));
-            }
-        };
-    }
-
-    public class MyDynamicRouterPojo {
-
-        private final String target;
-
-        public MyDynamicRouterPojo(String target) {
-            this.target = target;
-        }
-
-        @DynamicRouter
-        public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) {
-            if (previous == null) {
-                return target;
-            } else {
-                return null;
-            }
-        }
-    }
-}