You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/05 09:53:15 UTC

[camel] branch camel-3.14.x updated: Fix PubSub concurrent access error on shutdown (#7365)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 052d1ccb9a9 Fix PubSub concurrent access error on shutdown (#7365)
052d1ccb9a9 is described below

commit 052d1ccb9a92a7b20dd1ac200b4c39f258b888fb
Author: vpaturet <46...@users.noreply.github.com>
AuthorDate: Tue Apr 5 11:53:04 2022 +0200

    Fix PubSub concurrent access error on shutdown (#7365)
---
 .../camel/component/google/pubsub/GooglePubsubConsumer.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index b4aa82bb6de..83304d01ac7 100644
--- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.google.pubsub;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -54,7 +55,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
-        this.subscribers = new LinkedList<>();
+        this.subscribers = Collections.synchronizedList(new LinkedList<>());
 
         String loggerId = endpoint.getLoggerId();
 
@@ -80,9 +81,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         super.doStop();
         localLog.info("Stopping Google PubSub consumer for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName());
 
-        if (subscribers != null && !subscribers.isEmpty()) {
-            localLog.info("Stopping subscribers for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName());
-            subscribers.forEach(AbstractApiService::stopAsync);
+        synchronized (subscribers) {
+            if (!subscribers.isEmpty()) {
+                localLog.info("Stopping subscribers for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName());
+                subscribers.forEach(AbstractApiService::stopAsync);
+            }
         }
 
         if (executor != null) {