You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/03 15:05:14 UTC
[camel] 26/32: Use a single field + lock to manage the services
state
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 672c81746520376ab350474af2b36aec958b6a0f
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Oct 1 17:20:26 2018 +0200
Use a single field + lock to manage the services state
---
.../apache/camel/component/seda/SedaEndpoint.java | 2 +-
.../org/apache/camel/impl/DefaultCamelContext.java | 2 +-
.../apache/camel/support/ChildServiceSupport.java | 164 ++++++------
.../org/apache/camel/support/ServiceSupport.java | 278 ++++++++++-----------
.../apache/camel/support/ServiceSupportTest.java | 2 +-
5 files changed, 215 insertions(+), 233 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index bfa1c98..ebc84c8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -510,7 +510,7 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow
@Override
public void shutdown() throws Exception {
- if (shutdown.get()) {
+ if (isShutdown()) {
log.trace("Service already shut down");
return;
}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index daf04a1..6c4106e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -3968,7 +3968,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
} else {
// and start the route service (no need to start children as they are already warmed up)
try {
- routeService.start(false);
+ routeService.start();
route.getProperties().remove("route.start.exception");
} catch (Exception e) {
route.getProperties().put("route.start.exception", e);
diff --git a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
index 42cdbea..93b5778 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java
@@ -16,120 +16,108 @@
*/
package org.apache.camel.support;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.camel.Service;
import org.apache.camel.util.ServiceHelper;
/**
* Base class to control lifecycle for a set of child {@link org.apache.camel.Service}s.
*/
public abstract class ChildServiceSupport extends ServiceSupport {
- private Set<Object> childServices;
-
- public void start() throws Exception {
- start(true);
- }
- public void start(boolean startChildren) throws Exception {
- if (!started.get()) {
- if (starting.compareAndSet(false, true)) {
- boolean childrenStarted = false;
- Exception ex = null;
- try {
- if (childServices != null && startChildren) {
- ServiceHelper.startService(childServices);
- }
- childrenStarted = true;
- doStart();
- } catch (Exception e) {
- ex = e;
- } finally {
- if (ex != null) {
- try {
- stop(childrenStarted);
- } catch (Exception e) {
- // Ignore exceptions as we want to show the original exception
- }
- throw ex;
- } else {
- started.set(true);
- starting.set(false);
- stopping.set(false);
- stopped.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
- }
- }
+ protected volatile List<Service> childServices;
+
+ public void start() throws Exception {
+ synchronized (lock) {
+ if (status == STARTED) {
+ log.trace("Service already started");
+ return;
}
- }
- }
-
- private void stop(boolean childrenStarted) throws Exception {
- if (stopping.compareAndSet(false, true)) {
+ if (status == STARTING) {
+ log.trace("Service already starting");
+ return;
+ }
+ status = STARTING;
+ log.trace("Starting service");
try {
- try {
- starting.set(false);
- suspending.set(false);
- if (childrenStarted) {
- doStop();
- }
- } finally {
- started.set(false);
- suspended.set(false);
- if (childServices != null) {
- ServiceHelper.stopService(childServices);
- }
- }
- } finally {
- stopped.set(true);
- stopping.set(false);
- starting.set(false);
- started.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
+ ServiceHelper.startService(childServices);
+ doStart();
+ status = STARTED;
+ log.trace("Service started");
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error while starting service", e);
+ ServiceHelper.stopService(childServices);
+ throw e;
}
}
}
public void stop() throws Exception {
- if (!stopped.get()) {
- stop(true);
+ synchronized (lock) {
+ if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) {
+ log.trace("Service already stopped");
+ return;
+ }
+ if (status == STOPPING) {
+ log.trace("Service already stopping");
+ return;
+ }
+ status = STOPPING;
+ log.trace("Stopping service");
+ try {
+ doStop();
+ ServiceHelper.stopService(childServices);
+ status = STOPPED;
+ log.trace("Service stopped service");
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error while stopping service", e);
+ throw e;
+ }
}
}
-
- public void shutdown() throws Exception {
- // ensure we are stopped first
- stop();
- if (shuttingdown.compareAndSet(false, true)) {
+ @Override
+ public void shutdown() throws Exception {
+ synchronized (lock) {
+ if (status == SHUTDOWN) {
+ log.trace("Service already shut down");
+ return;
+ }
+ if (status == SHUTTINGDOWN) {
+ log.trace("Service already shutting down");
+ return;
+ }
+ stop();
+ status = SHUTDOWN;
+ log.trace("Shutting down service");
try {
- try {
- doShutdown();
- } finally {
- if (childServices != null) {
- ServiceHelper.stopAndShutdownServices(childServices);
- }
- }
- } finally {
- // shutdown is also stopped so only set shutdown flags
- shutdown.set(true);
- shuttingdown.set(false);
+ doShutdown();
+ ServiceHelper.stopAndShutdownServices(childServices);
+ log.trace("Service shut down");
+ status = SHUTDOWN;
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error shutting down service", e);
+ throw e;
}
}
}
-
+
protected void addChildService(Object childService) {
- synchronized (this) {
+ if (childService instanceof Service) {
if (childServices == null) {
- childServices = new LinkedHashSet<>();
+ synchronized (lock) {
+ if (childServices == null) {
+ childServices = new CopyOnWriteArrayList<>();
+ }
+ }
}
+ childServices.add((Service) childService);
}
- childServices.add(childService);
}
protected boolean removeChildService(Object childService) {
diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
index c1ee206..cffa616 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
@@ -16,13 +16,13 @@
*/
package org.apache.camel.support;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
import org.apache.camel.StatefulService;
-import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,16 +42,20 @@ import org.slf4j.LoggerFactory;
*/
public abstract class ServiceSupport implements StatefulService {
- protected final AtomicBoolean started = new AtomicBoolean(false);
- protected final AtomicBoolean starting = new AtomicBoolean(false);
- protected final AtomicBoolean stopping = new AtomicBoolean(false);
- protected final AtomicBoolean stopped = new AtomicBoolean(false);
- protected final AtomicBoolean suspending = new AtomicBoolean(false);
- protected final AtomicBoolean suspended = new AtomicBoolean(false);
- protected final AtomicBoolean shuttingdown = new AtomicBoolean(false);
- protected final AtomicBoolean shutdown = new AtomicBoolean(false);
+ protected static final int NEW = 0;
+ protected static final int STARTING = 1;
+ protected static final int STARTED = 2;
+ protected static final int SUSPENDING = 3;
+ protected static final int SUSPENDED = 4;
+ protected static final int STOPPING = 5;
+ protected static final int STOPPED = 6;
+ protected static final int SHUTTINGDOWN = 7;
+ protected static final int SHUTDOWN = 8;
+ protected static final int FAILED = 9;
protected final Logger log = LoggerFactory.getLogger(getClass());
+ protected final Object lock = new Object();
+ protected volatile int status = NEW;
/**
* <b>Important: </b> You should override the lifecycle methods that start with <tt>do</tt>, eg {@link #doStart()},
@@ -60,41 +64,26 @@ public abstract class ServiceSupport implements StatefulService {
* invoke the operation in a safe manner.
*/
public void start() throws Exception {
- if (isStarting() || isStarted()) {
- // only start service if not already started
- log.trace("Service already started");
- return;
- }
- if (starting.compareAndSet(false, true)) {
+ synchronized (lock) {
+ if (status == STARTED) {
+ log.trace("Service already started");
+ return;
+ }
+ if (status == STARTING) {
+ log.trace("Service already starting");
+ return;
+ }
+ status = STARTING;
log.trace("Starting service");
try {
doStart();
- started.set(true);
- starting.set(false);
- stopping.set(false);
- stopped.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
+ status = STARTED;
+ log.trace("Service started");
} catch (Exception e) {
- try {
- stop();
- } catch (Exception e2) {
- // Ignore exceptions as we want to show the original exception
- } finally {
- // ensure flags get reset to stopped as we failed during starting
- stopping.set(false);
- stopped.set(true);
- starting.set(false);
- started.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
- }
+ status = FAILED;
+ log.trace("Error while starting service", e);
throw e;
- }
+ }
}
}
@@ -105,26 +94,26 @@ public abstract class ServiceSupport implements StatefulService {
* invoke the operation in a safe manner.
*/
public void stop() throws Exception {
- if (isStopped()) {
- log.trace("Service already stopped");
- return;
- }
- if (isStopping()) {
- log.trace("Service already stopping");
- return;
- }
- stopping.set(true);
- try {
- doStop();
- } finally {
- stopping.set(false);
- stopped.set(true);
- starting.set(false);
- started.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
+ synchronized (lock) {
+ if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) {
+ log.trace("Service already stopped");
+ return;
+ }
+ if (status == STOPPING) {
+ log.trace("Service already stopping");
+ return;
+ }
+ status = STOPPING;
+ log.trace("Stopping service");
+ try {
+ doStop();
+ status = STOPPED;
+ log.trace("Service stopped service");
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error while stopping service", e);
+ throw e;
+ }
}
}
@@ -136,22 +125,25 @@ public abstract class ServiceSupport implements StatefulService {
*/
@Override
public void suspend() throws Exception {
- if (!suspended.get()) {
- if (suspending.compareAndSet(false, true)) {
- try {
- starting.set(false);
- stopping.set(false);
- doSuspend();
- } finally {
- stopped.set(false);
- stopping.set(false);
- starting.set(false);
- started.set(false);
- suspending.set(false);
- suspended.set(true);
- shutdown.set(false);
- shuttingdown.set(false);
- }
+ synchronized (lock) {
+ if (status == SUSPENDED) {
+ log.trace("Service already suspended");
+ return;
+ }
+ if (status == SUSPENDING) {
+ log.trace("Service already suspending");
+ return;
+ }
+ status = SUSPENDING;
+ log.trace("Suspending service");
+ try {
+ doSuspend();
+ status = SUSPENDED;
+ log.trace("Service suspended");
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error while suspending service", e);
+ throw e;
}
}
}
@@ -164,20 +156,21 @@ public abstract class ServiceSupport implements StatefulService {
*/
@Override
public void resume() throws Exception {
- if (suspended.get()) {
- if (starting.compareAndSet(false, true)) {
- try {
- doResume();
- } finally {
- started.set(true);
- starting.set(false);
- stopping.set(false);
- stopped.set(false);
- suspending.set(false);
- suspended.set(false);
- shutdown.set(false);
- shuttingdown.set(false);
- }
+ synchronized (lock) {
+ if (status != SUSPENDED) {
+ log.trace("Service is not suspended");
+ return;
+ }
+ status = STARTING;
+ log.trace("Resuming service");
+ try {
+ doResume();
+ status = STARTED;
+ log.trace("Service resumed");
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error while resuming service", e);
+ throw e;
}
}
}
@@ -190,105 +183,106 @@ public abstract class ServiceSupport implements StatefulService {
*/
@Override
public void shutdown() throws Exception {
- if (shutdown.get()) {
- log.trace("Service already shut down");
- return;
- }
- // ensure we are stopped first
- stop();
-
- if (shuttingdown.compareAndSet(false, true)) {
+ synchronized (lock) {
+ if (status == SHUTDOWN) {
+ log.trace("Service already shut down");
+ return;
+ }
+ if (status == SHUTTINGDOWN) {
+ log.trace("Service already shutting down");
+ return;
+ }
+ stop();
+ status = SHUTDOWN;
+ log.trace("Shutting down service");
try {
doShutdown();
- } finally {
- // shutdown is also stopped so only set shutdown flags
- shutdown.set(true);
- shuttingdown.set(false);
+ log.trace("Service shut down");
+ status = SHUTDOWN;
+ } catch (Exception e) {
+ status = FAILED;
+ log.trace("Error shutting down service", e);
+ throw e;
}
}
}
@Override
public ServiceStatus getStatus() {
- // we should check the ---ing states first, as this indicate the state is in the middle of doing that
- if (isStarting()) {
- return ServiceStatus.Starting;
- }
- if (isStopping()) {
- return ServiceStatus.Stopping;
- }
- if (isSuspending()) {
- return ServiceStatus.Suspending;
- }
-
- // then check for the regular states
- if (isStarted()) {
- return ServiceStatus.Started;
- }
- if (isStopped()) {
- return ServiceStatus.Stopped;
+ switch (status) {
+ case STARTING:
+ return ServiceStatus.Starting;
+ case STARTED:
+ return ServiceStatus.Started;
+ case SUSPENDING:
+ return ServiceStatus.Suspending;
+ case SUSPENDED:
+ return ServiceStatus.Suspended;
+ case STOPPING:
+ return ServiceStatus.Stopping;
+ default:
+ return ServiceStatus.Stopped;
}
- if (isSuspended()) {
- return ServiceStatus.Suspended;
- }
-
- // use stopped as fallback
- return ServiceStatus.Stopped;
}
@Override
public boolean isStarted() {
- return started.get();
+ return status == STARTED;
}
@Override
public boolean isStarting() {
- return starting.get();
+ return status == STARTING;
}
@Override
public boolean isStopping() {
- return stopping.get();
+ return status == STOPPING;
}
@Override
public boolean isStopped() {
- return stopped.get();
+ return status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN || status == FAILED;
}
@Override
public boolean isSuspending() {
- return suspending.get();
+ return status == SUSPENDING;
}
@Override
public boolean isSuspended() {
- return suspended.get();
+ return status == SUSPENDED;
}
@Override
public boolean isRunAllowed() {
- // if we have not yet initialized, then all options is false
- boolean unused1 = !started.get() && !starting.get() && !stopping.get() && !stopped.get();
- boolean unused2 = !suspending.get() && !suspended.get() && !shutdown.get() && !shuttingdown.get();
- if (unused1 && unused2) {
- return false;
- }
- return !isStoppingOrStopped();
+ return isStartingOrStarted() || isSuspendingOrSuspended();
+ }
+
+ public boolean isShutdown() {
+ return status == SHUTDOWN;
}
/**
* Is the service in progress of being stopped or already stopped
*/
public boolean isStoppingOrStopped() {
- return stopping.get() || stopped.get();
+ return isStopping() || isStopped();
}
/**
* Is the service in progress of being suspended or already suspended
*/
public boolean isSuspendingOrSuspended() {
- return suspending.get() || suspended.get();
+ return isSuspending() || isSuspended();
+ }
+
+ /**
+ * Is the service in progress of being suspended or already suspended
+ */
+ public boolean isStartingOrStarted() {
+ return isStarting() || isStarted();
}
/**
diff --git a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
index e2c848a..5b2b232 100644
--- a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java
@@ -135,7 +135,7 @@ public class ServiceSupportTest extends TestSupport {
public ServiceSupportTestExOnStart() {
// just for testing force it to not be stopped
- stopped.set(false);
+ status = -1;
}
@Override