You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/05/25 17:56:44 UTC

[camel] 01/03: CAMEL-19295: Experiment with sync LRUCache and test case that otherwise will OOME

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-19295/backport-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3d827c2935a31e18f007b630c175f7fae337faaf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri May 19 11:13:28 2023 +0200

    CAMEL-19295: Experiment with sync LRUCache and test case that otherwise will OOME
---
 .../camel/impl/DefaultEndpointRegistryTest.java    | 54 ++++++++++++++++++++++
 .../camel/support/DefaultLRUCacheFactory.java      | 21 +++++----
 2 files changed, 65 insertions(+), 10 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java
index 2d8cdc8e986..9ac3db65b5e 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointRegistryTest.java
@@ -16,9 +16,17 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.engine.DefaultEndpointRegistry;
+import org.apache.camel.impl.engine.SimpleCamelContext;
 import org.apache.camel.spi.EndpointRegistry;
+import org.apache.camel.support.NormalizedUri;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -68,4 +76,50 @@ public class DefaultEndpointRegistryTest {
         assertTrue(reg.isStatic("file:error"));
     }
 
+    //Testing the issue https://issues.apache.org/jira/browse/CAMEL-19295
+    @Test
+    public void testConcurrency() throws InterruptedException {
+
+        SimpleCamelContext context = new SimpleCamelContext();
+        context.start();
+
+        ProducerTemplate producerTemplate = context.createProducerTemplate();
+        EndpointRegistry<NormalizedUri> endpointRegistry = context.getEndpointRegistry();
+
+        int nThreads = 4;
+        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+        int iterations = 500;
+
+        for (int j = 0; j < iterations; j++) {
+            CountDownLatch allThreadCompletionSemaphore = new CountDownLatch(nThreads);
+            for (int i = 0; i < nThreads; i++) {
+
+                executorService.submit(() -> {
+
+                    producerTemplate.requestBody("controlbus:route?routeId=route1&action=ACTION_STATUS&loggingLevel=off", null,
+                            ServiceStatus.class);
+                    producerTemplate.requestBody("controlbus:route?routeId=route2&action=ACTION_STATUS&loggingLevel=off", null,
+                            ServiceStatus.class);
+                    producerTemplate.requestBody("controlbus:route?routeId=route3&action=ACTION_STATUS&loggingLevel=off", null,
+                            ServiceStatus.class);
+                    producerTemplate.requestBody("controlbus:route?routeId=route4&action=ACTION_STATUS&loggingLevel=off", null,
+                            ServiceStatus.class);
+                    producerTemplate.requestBody("controlbus:route?routeId=route5&action=ACTION_STATUS&loggingLevel=off", null,
+                            ServiceStatus.class);
+
+                    allThreadCompletionSemaphore.countDown();
+
+                });
+            }
+
+            allThreadCompletionSemaphore.await();
+
+            assertTrue(endpointRegistry.values().toArray() != null);
+
+        }
+
+        executorService.shutdown();
+
+    }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java
index 995a1405a2d..eefc3acbe7a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultLRUCacheFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -41,7 +42,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     @Override
     public <K, V> Map<K, V> createLRUCache(int maximumCacheSize) {
         LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize);
-        return new SimpleLRUCache<>(maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize));
     }
 
     /**
@@ -53,7 +54,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     @Override
     public <K, V> Map<K, V> createLRUCache(int maximumCacheSize, Consumer<V> onEvict) {
         LOG.trace("Creating LRUCache with maximumCacheSize: {}", maximumCacheSize);
-        return new SimpleLRUCache<>(16, maximumCacheSize, onEvict);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(16, maximumCacheSize, onEvict));
     }
 
     /**
@@ -67,7 +68,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     @Override
     public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize));
     }
 
     /**
@@ -83,7 +84,7 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     public <K, V> Map<K, V> createLRUCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity,
                 maximumCacheSize, stopOnEviction);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction));
     }
 
     /**
@@ -96,20 +97,20 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     @Override
     public <K, V> Map<K, V> createLRUSoftCache(int maximumCacheSize) {
         LOG.trace("Creating LRUSoftCache with maximumCacheSize: {}", maximumCacheSize);
-        return new SimpleLRUCache<>(maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize));
     }
 
     @Override
     public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize));
     }
 
     @Override
     public <K, V> Map<K, V> createLRUSoftCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity,
                 maximumCacheSize, stopOnEviction);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction));
     }
 
     /**
@@ -122,20 +123,20 @@ public class DefaultLRUCacheFactory extends LRUCacheFactory {
     @Override
     public <K, V> Map<K, V> createLRUWeakCache(int maximumCacheSize) {
         LOG.trace("Creating LRUWeakCache with maximumCacheSize: {}", maximumCacheSize);
-        return new SimpleLRUCache<>(maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(maximumCacheSize));
     }
 
     @Override
     public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}", initialCapacity, maximumCacheSize);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize));
     }
 
     @Override
     public <K, V> Map<K, V> createLRUWeakCache(int initialCapacity, int maximumCacheSize, boolean stopOnEviction) {
         LOG.trace("Creating LRUCache with initialCapacity: {}, maximumCacheSize: {}, stopOnEviction: {}", initialCapacity,
                 maximumCacheSize, stopOnEviction);
-        return new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction);
+        return Collections.synchronizedMap(new SimpleLRUCache<>(initialCapacity, maximumCacheSize, stopOnEviction));
     }
 
     private class SimpleLRUCache<K, V> extends LinkedHashMap<K, V> {