You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/03 15:05:14 UTC

[camel] 26/32: Use a single field + lock to manage the services state

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 672c81746520376ab350474af2b36aec958b6a0f
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Oct 1 17:20:26 2018 +0200

    Use a single field + lock to manage the services state
---
 .../apache/camel/component/seda/SedaEndpoint.java  |   2 +-
 .../org/apache/camel/impl/DefaultCamelContext.java |   2 +-
 .../apache/camel/support/ChildServiceSupport.java  | 164 ++++++------
 .../org/apache/camel/support/ServiceSupport.java   | 278 ++++++++++-----------
 .../apache/camel/support/ServiceSupportTest.java   |   2 +-
 5 files changed, 215 insertions(+), 233 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index bfa1c98..ebc84c8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -510,7 +510,7 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow
 
     @Override
     public void shutdown() throws Exception {
-        if (shutdown.get()) {
+        if (isShutdown()) {
             log.trace("Service already shut down");
             return;
         }
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index daf04a1..6c4106e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -3968,7 +3968,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
             } else {
                 // and start the route service (no need to start children as they are already warmed up)
                 try {
-                    routeService.start(false);
+                    routeService.start();
                     route.getProperties().remove("route.start.exception");
                 } catch (Exception e) {
                     route.getProperties().put("route.start.exception", e);
diff --git a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
index 42cdbea..93b5778 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
@@ -16,120 +16,108 @@
  */
 package org.apache.camel.support;
 
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.camel.Service;
 import org.apache.camel.util.ServiceHelper;
 
 /**
  * Base class to control lifecycle for a set of child {@link org.apache.camel.Service}s.
  */
 public abstract class ChildServiceSupport extends ServiceSupport {
-    private Set<Object> childServices;
-    
-    public void start() throws Exception {
-        start(true);
-    }
 
-    public void start(boolean startChildren) throws Exception {
-        if (!started.get()) {
-            if (starting.compareAndSet(false, true)) {
-                boolean childrenStarted = false;
-                Exception ex = null;
-                try {
-                    if (childServices != null && startChildren) {
-                        ServiceHelper.startService(childServices);
-                    }
-                    childrenStarted = true;
-                    doStart();
-                } catch (Exception e) {
-                    ex = e;
-                } finally {
-                    if (ex != null) {
-                        try {
-                            stop(childrenStarted);
-                        } catch (Exception e) {
-                            // Ignore exceptions as we want to show the original exception
-                        }
-                        throw ex;
-                    } else {
-                        started.set(true);
-                        starting.set(false);
-                        stopping.set(false);
-                        stopped.set(false);
-                        suspending.set(false);
-                        suspended.set(false);
-                        shutdown.set(false);
-                        shuttingdown.set(false);
-                    }
-                }
+    protected volatile List<Service> childServices;
+
+    public void start() throws Exception {
+        synchronized (lock) {
+            if (status == STARTED) {
+                log.trace("Service already started");
+                return;
             }
-        }
-    }
-    
-    private void stop(boolean childrenStarted) throws Exception {
-        if (stopping.compareAndSet(false, true)) {
+            if (status == STARTING) {
+                log.trace("Service already starting");
+                return;
+            }
+            status = STARTING;
+            log.trace("Starting service");
             try {
-                try {
-                    starting.set(false);
-                    suspending.set(false);
-                    if (childrenStarted) {
-                        doStop();
-                    }
-                } finally {
-                    started.set(false);
-                    suspended.set(false);
-                    if (childServices != null) {
-                        ServiceHelper.stopService(childServices);
-                    }
-                }
-            } finally {
-                stopped.set(true);
-                stopping.set(false);
-                starting.set(false);
-                started.set(false);
-                suspending.set(false);
-                suspended.set(false);
-                shutdown.set(false);
-                shuttingdown.set(false);
+                ServiceHelper.startService(childServices);
+                doStart();
+                status = STARTED;
+                log.trace("Service started");
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error while starting service", e);
+                ServiceHelper.stopService(childServices);
+                throw e;
             }
         }
     }
 
     public void stop() throws Exception {
-        if (!stopped.get()) {
-            stop(true);
+        synchronized (lock) {
+            if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) {
+                log.trace("Service already stopped");
+                return;
+            }
+            if (status == STOPPING) {
+                log.trace("Service already stopping");
+                return;
+            }
+            status = STOPPING;
+            log.trace("Stopping service");
+            try {
+                doStop();
+                ServiceHelper.stopService(childServices);
+                status = STOPPED;
+                log.trace("Service stopped service");
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error while stopping service", e);
+                throw e;
+            }
         }
     }
-    
-    public void shutdown() throws Exception {
-        // ensure we are stopped first
-        stop();
 
-        if (shuttingdown.compareAndSet(false, true)) {
+    @Override
+    public void shutdown() throws Exception {
+        synchronized (lock) {
+            if (status == SHUTDOWN) {
+                log.trace("Service already shut down");
+                return;
+            }
+            if (status == SHUTTINGDOWN) {
+                log.trace("Service already shutting down");
+                return;
+            }
+            stop();
+            status = SHUTDOWN;
+            log.trace("Shutting down service");
             try {
-                try {
-                    doShutdown();
-                } finally {
-                    if (childServices != null) {
-                        ServiceHelper.stopAndShutdownServices(childServices);
-                    }
-                }
-            } finally {
-                // shutdown is also stopped so only set shutdown flags
-                shutdown.set(true);
-                shuttingdown.set(false);
+                doShutdown();
+                ServiceHelper.stopAndShutdownServices(childServices);
+                log.trace("Service shut down");
+                status = SHUTDOWN;
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error shutting down service", e);
+                throw e;
             }
         }
     }
-    
+
     protected void addChildService(Object childService) {
-        synchronized (this) {
+        if (childService instanceof Service) {
             if (childServices == null) {
-                childServices = new LinkedHashSet<>();
+                synchronized (lock) {
+                    if (childServices == null) {
+                        childServices = new CopyOnWriteArrayList<>();
+                    }
+                }
             }
+            childServices.add((Service) childService);
         }
-        childServices.add(childService);
     }
 
     protected boolean removeChildService(Object childService) {
diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
index c1ee206..cffa616 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
@@ -16,13 +16,13 @@
  */
 package org.apache.camel.support;
 
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.StatefulService;
-import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,16 +42,20 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class ServiceSupport implements StatefulService {
 
-    protected final AtomicBoolean started = new AtomicBoolean(false);
-    protected final AtomicBoolean starting = new AtomicBoolean(false);
-    protected final AtomicBoolean stopping = new AtomicBoolean(false);
-    protected final AtomicBoolean stopped = new AtomicBoolean(false);
-    protected final AtomicBoolean suspending = new AtomicBoolean(false);
-    protected final AtomicBoolean suspended = new AtomicBoolean(false);
-    protected final AtomicBoolean shuttingdown = new AtomicBoolean(false);
-    protected final AtomicBoolean shutdown = new AtomicBoolean(false);
+    protected static final int NEW = 0;
+    protected static final int STARTING = 1;
+    protected static final int STARTED = 2;
+    protected static final int SUSPENDING = 3;
+    protected static final int SUSPENDED = 4;
+    protected static final int STOPPING = 5;
+    protected static final int STOPPED = 6;
+    protected static final int SHUTTINGDOWN = 7;
+    protected static final int SHUTDOWN = 8;
+    protected static final int FAILED = 9;
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    protected final Object lock = new Object();
+    protected volatile int status = NEW;
 
     /**
      * <b>Important: </b> You should override the lifecycle methods that start with <tt>do</tt>, eg {@link #doStart()},
@@ -60,41 +64,26 @@ public abstract class ServiceSupport implements StatefulService {
      * invoke the operation in a safe manner.
      */
     public void start() throws Exception {
-        if (isStarting() || isStarted()) {
-            // only start service if not already started
-            log.trace("Service already started");
-            return;
-        }
-        if (starting.compareAndSet(false, true)) {
+        synchronized (lock) {
+            if (status == STARTED) {
+                log.trace("Service already started");
+                return;
+            }
+            if (status == STARTING) {
+                log.trace("Service already starting");
+                return;
+            }
+            status = STARTING;
             log.trace("Starting service");
             try {
                 doStart();
-                started.set(true);
-                starting.set(false);
-                stopping.set(false);
-                stopped.set(false);
-                suspending.set(false);
-                suspended.set(false);
-                shutdown.set(false);
-                shuttingdown.set(false);
+                status = STARTED;
+                log.trace("Service started");
             } catch (Exception e) {
-                try {
-                    stop();
-                } catch (Exception e2) {
-                    // Ignore exceptions as we want to show the original exception
-                } finally {
-                    // ensure flags get reset to stopped as we failed during starting
-                    stopping.set(false);
-                    stopped.set(true);
-                    starting.set(false);
-                    started.set(false);
-                    suspending.set(false);
-                    suspended.set(false);
-                    shutdown.set(false);
-                    shuttingdown.set(false);
-                }
+                status = FAILED;
+                log.trace("Error while starting service", e);
                 throw e;
-            } 
+            }
         }
     }
 
@@ -105,26 +94,26 @@ public abstract class ServiceSupport implements StatefulService {
      * invoke the operation in a safe manner.
      */
     public void stop() throws Exception {
-        if (isStopped()) {
-            log.trace("Service already stopped");
-            return;
-        }
-        if (isStopping()) {
-            log.trace("Service already stopping");
-            return;
-        }
-        stopping.set(true);
-        try {
-            doStop();
-        } finally {
-            stopping.set(false);
-            stopped.set(true);
-            starting.set(false);
-            started.set(false);
-            suspending.set(false);
-            suspended.set(false);
-            shutdown.set(false);
-            shuttingdown.set(false);            
+        synchronized (lock) {
+            if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) {
+                log.trace("Service already stopped");
+                return;
+            }
+            if (status == STOPPING) {
+                log.trace("Service already stopping");
+                return;
+            }
+            status = STOPPING;
+            log.trace("Stopping service");
+            try {
+                doStop();
+                status = STOPPED;
+                log.trace("Service stopped service");
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error while stopping service", e);
+                throw e;
+            }
         }
     }
 
@@ -136,22 +125,25 @@ public abstract class ServiceSupport implements StatefulService {
      */
     @Override
     public void suspend() throws Exception {
-        if (!suspended.get()) {
-            if (suspending.compareAndSet(false, true)) {
-                try {
-                    starting.set(false);
-                    stopping.set(false);
-                    doSuspend();
-                } finally {
-                    stopped.set(false);
-                    stopping.set(false);
-                    starting.set(false);
-                    started.set(false);
-                    suspending.set(false);
-                    suspended.set(true);
-                    shutdown.set(false);
-                    shuttingdown.set(false);
-                }
+        synchronized (lock) {
+            if (status == SUSPENDED) {
+                log.trace("Service already suspended");
+                return;
+            }
+            if (status == SUSPENDING) {
+                log.trace("Service already suspending");
+                return;
+            }
+            status = SUSPENDING;
+            log.trace("Suspending service");
+            try {
+                doSuspend();
+                status = SUSPENDED;
+                log.trace("Service suspended");
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error while suspending service", e);
+                throw e;
             }
         }
     }
@@ -164,20 +156,21 @@ public abstract class ServiceSupport implements StatefulService {
      */
     @Override
     public void resume() throws Exception {
-        if (suspended.get()) {
-            if (starting.compareAndSet(false, true)) {
-                try {
-                    doResume();
-                } finally {
-                    started.set(true);
-                    starting.set(false);
-                    stopping.set(false);
-                    stopped.set(false);
-                    suspending.set(false);
-                    suspended.set(false);
-                    shutdown.set(false);
-                    shuttingdown.set(false);
-                }
+        synchronized (lock) {
+            if (status != SUSPENDED) {
+                log.trace("Service is not suspended");
+                return;
+            }
+            status = STARTING;
+            log.trace("Resuming service");
+            try {
+                doResume();
+                status = STARTED;
+                log.trace("Service resumed");
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error while resuming service", e);
+                throw e;
             }
         }
     }
@@ -190,105 +183,106 @@ public abstract class ServiceSupport implements StatefulService {
      */
     @Override
     public void shutdown() throws Exception {
-        if (shutdown.get()) {
-            log.trace("Service already shut down");
-            return;
-        }
-        // ensure we are stopped first
-        stop();
-
-        if (shuttingdown.compareAndSet(false, true)) {
+        synchronized (lock) {
+            if (status == SHUTDOWN) {
+                log.trace("Service already shut down");
+                return;
+            }
+            if (status == SHUTTINGDOWN) {
+                log.trace("Service already shutting down");
+                return;
+            }
+            stop();
+            status = SHUTDOWN;
+            log.trace("Shutting down service");
             try {
                 doShutdown();
-            } finally {
-                // shutdown is also stopped so only set shutdown flags
-                shutdown.set(true);
-                shuttingdown.set(false);
+                log.trace("Service shut down");
+                status = SHUTDOWN;
+            } catch (Exception e) {
+                status = FAILED;
+                log.trace("Error shutting down service", e);
+                throw e;
             }
         }
     }
 
     @Override
     public ServiceStatus getStatus() {
-        // we should check the ---ing states first, as this indicate the state is in the middle of doing that
-        if (isStarting()) {
-            return ServiceStatus.Starting;
-        }
-        if (isStopping()) {
-            return ServiceStatus.Stopping;
-        }
-        if (isSuspending()) {
-            return ServiceStatus.Suspending;
-        }
-
-        // then check for the regular states
-        if (isStarted()) {
-            return ServiceStatus.Started;
-        }
-        if (isStopped()) {
-            return ServiceStatus.Stopped;
+        switch (status) {
+            case STARTING:
+                return ServiceStatus.Starting;
+            case STARTED:
+                return ServiceStatus.Started;
+            case SUSPENDING:
+                return ServiceStatus.Suspending;
+            case SUSPENDED:
+                return ServiceStatus.Suspended;
+            case STOPPING:
+                return ServiceStatus.Stopping;
+            default:
+                return ServiceStatus.Stopped;
         }
-        if (isSuspended()) {
-            return ServiceStatus.Suspended;
-        }
-
-        // use stopped as fallback
-        return ServiceStatus.Stopped;
     }
     
     @Override
     public boolean isStarted() {
-        return started.get();
+        return status == STARTED;
     }
 
     @Override
     public boolean isStarting() {
-        return starting.get();
+        return status == STARTING;
     }
 
     @Override
     public boolean isStopping() {
-        return stopping.get();
+        return status == STOPPING;
     }
 
     @Override
     public boolean isStopped() {
-        return stopped.get();
+        return status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN || status == FAILED;
     }
 
     @Override
     public boolean isSuspending() {
-        return suspending.get();
+        return status == SUSPENDING;
     }
 
     @Override
     public boolean isSuspended() {
-        return suspended.get();
+        return status == SUSPENDED;
     }
 
     @Override
     public boolean isRunAllowed() {
-        // if we have not yet initialized, then all options is false
-        boolean unused1 = !started.get() && !starting.get() && !stopping.get() && !stopped.get();
-        boolean unused2 = !suspending.get() && !suspended.get() && !shutdown.get() && !shuttingdown.get();
-        if (unused1 && unused2) {
-            return false;
-        }
-        return !isStoppingOrStopped();
+        return isStartingOrStarted() || isSuspendingOrSuspended();
+    }
+
+    public boolean isShutdown() {
+        return status == SHUTDOWN;
     }
 
     /**
      * Is the service in progress of being stopped or already stopped
      */
     public boolean isStoppingOrStopped() {
-        return stopping.get() || stopped.get();
+        return isStopping() || isStopped();
     }
 
     /**
      * Is the service in progress of being suspended or already suspended
      */
     public boolean isSuspendingOrSuspended() {
-        return suspending.get() || suspended.get();
+        return isSuspending() || isSuspended();
+    }
+
+    /**
+     * Is the service in progress of being suspended or already suspended
+     */
+    public boolean isStartingOrStarted() {
+        return isStarting() || isStarted();
     }
 
     /**
diff --git a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
index e2c848a..5b2b232 100644
--- a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
@@ -135,7 +135,7 @@ public class ServiceSupportTest extends TestSupport {
 
         public ServiceSupportTestExOnStart() {
             // just for testing force it to not be stopped
-            stopped.set(false);
+            status = -1;
         }
 
         @Override