You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/16 12:58:31 UTC
camel git commit: CAMEL-8491: Camel POJO producer/consumer should
defer starting until CamelContext is starting
Repository: camel
Updated Branches:
refs/heads/master 9f893d83e -> 9600bc4fe
CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9600bc4f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9600bc4f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9600bc4f
Branch: refs/heads/master
Commit: 9600bc4fec55a4ed02f2404dfd33b3584491ec80
Parents: 9f893d8
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 16 09:10:53 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 16 13:00:08 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 16 ++-
.../org/apache/camel/DeferStartService.java | 28 ++++
.../camel/component/bean/ProxyHelper.java | 7 +-
.../camel/impl/CamelPostProcessorHelper.java | 35 +++--
.../apache/camel/impl/DefaultCamelContext.java | 57 ++++++--
.../camel/impl/DefaultProducerTemplate.java | 6 +
.../org/apache/camel/impl/DeferProducer.java | 140 +++++++++++++++++++
.../camel/impl/DeferServiceStartupListener.java | 45 ++++++
.../camel/processor/DeferServiceFactory.java | 46 ++++++
.../impl/PojoProduceInterceptEndpointTest.java | 107 ++++++++++++++
.../PojoProduceProxyInterceptEndpointTest.java | 106 ++++++++++++++
.../handler/CamelNamespaceHandler.java | 12 +-
12 files changed, 566 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index bfe116c..84505bc 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -219,10 +219,10 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
* If the option <tt>closeOnShutdown</tt> is <tt>false</tt> then this context will not stop the service when the context stops.
*
* @param object the service
- * @param closeOnShutdown whether to close the service when this CamelContext shutdown.
+ * @param stopOnShutdown whether to stop the service when this CamelContext shutdown.
* @throws Exception can be thrown when starting the service
*/
- void addService(Object object, boolean closeOnShutdown) throws Exception;
+ void addService(Object object, boolean stopOnShutdown) throws Exception;
/**
* Removes a service from this context.
@@ -253,6 +253,18 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
<T> T hasService(Class<T> type);
/**
+ * Defers starting the service until {@link CamelContext} is started and has initialized all its prior services and routes.
+ * <p/>
+ * If {@link CamelContext} is already started then the service is started immediately.
+ *
+ * @param object the service
+ * @param stopOnShutdown whether to stop the service when this CamelContext shutdown. Setting this to <tt>true</tt> will keep a reference to the service in
+ * this {@link CamelContext} until the context is stopped. So do not use it for short lived services.
+ * @throws Exception can be thrown when starting the service, which is only attempted if {@link CamelContext} has already been started when calling this method.
+ */
+ void deferStartService(Object object, boolean stopOnShutdown) throws Exception;
+
+ /**
* Adds the given listener to be invoked when {@link CamelContext} have just been started.
* <p/>
* This allows listeners to do any custom work after the routes and other services have been started and are running.
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/DeferStartService.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/DeferStartService.java b/camel-core/src/main/java/org/apache/camel/DeferStartService.java
new file mode 100644
index 0000000..90e45b3
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/DeferStartService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+public interface DeferStartService<T, F> {
+
+ /**
+ * Creates the service by defers starting the service.
+ *
+ * @param factory the factory which creates the service
+ * @return the service
+ */
+ T create(F factory);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
index d287caf..d51cf18 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
@@ -20,7 +20,7 @@ import java.lang.reflect.Proxy;
import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
-import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.processor.DeferServiceFactory;
/**
* A helper class for creating proxies which delegate to Camel
@@ -54,9 +54,8 @@ public final class ProxyHelper {
* Creates a Proxy which sends the exchange to the endpoint.
*/
public static <T> T createProxy(Endpoint endpoint, ClassLoader cl, Class<T>[] interfaceClasses, MethodInfoCache methodCache) throws Exception {
- Producer producer = endpoint.createProducer();
- // ensure the producer is started
- ServiceHelper.startService(producer);
+ Producer producer = DeferServiceFactory.createProducer(endpoint);
+ endpoint.getCamelContext().deferStartService(producer, true);
return createProxyObject(endpoint, producer, cl, interfaceClasses, methodCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 817a2f9..af91ae7 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -22,7 +22,6 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.Component;
import org.apache.camel.Consume;
import org.apache.camel.Consumer;
import org.apache.camel.ConsumerTemplate;
@@ -39,6 +38,7 @@ import org.apache.camel.component.bean.BeanInfo;
import org.apache.camel.component.bean.BeanProcessor;
import org.apache.camel.component.bean.ProxyHelper;
import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.DeferServiceFactory;
import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.IntrospectionSupport;
@@ -105,7 +105,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
Processor processor = createConsumerProcessor(bean, method, endpoint);
Consumer consumer = endpoint.createConsumer(processor);
LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
- startService(consumer, bean, beanName);
+ startService(consumer, endpoint.getCamelContext(), bean, beanName);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -115,13 +115,19 @@ public class CamelPostProcessorHelper implements CamelContextAware {
/**
* Stats the given service
*/
- protected void startService(Service service, Object bean, String beanName) throws Exception {
- if (isSingleton(bean, beanName)) {
- getCamelContext().addService(service);
+ protected void startService(Service service, CamelContext camelContext, Object bean, String beanName) throws Exception {
+ // defer starting the service until CamelContext has started all its initial services
+ if (camelContext != null) {
+ camelContext.deferStartService(service, true);
} else {
- LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
+ // mo CamelContext then start service manually
ServiceHelper.startService(service);
}
+
+ boolean singleton = isSingleton(bean, beanName);
+ if (!singleton) {
+ LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
+ }
}
/**
@@ -281,10 +287,12 @@ public class CamelPostProcessorHelper implements CamelContextAware {
String injectionPointName, Object bean) {
// endpoint is optional for this injection point
Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false);
- ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
+ CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext();
+ ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint);
// start the template so its ready to use
try {
- answer.start();
+ // no need to defer the template as it can adjust to the endpoint at runtime
+ startService(answer, context, bean, null);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -299,7 +307,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
// start the template so its ready to use
try {
- answer.start();
+ startService(answer, null, null, null);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -311,9 +319,9 @@ public class CamelPostProcessorHelper implements CamelContextAware {
*/
protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) {
try {
- PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
- startService(pollingConsumer, bean, beanName);
- return pollingConsumer;
+ PollingConsumer consumer = endpoint.createPollingConsumer();
+ startService(consumer, endpoint.getCamelContext(), bean, beanName);
+ return consumer;
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -324,8 +332,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
*/
protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
try {
- Producer producer = endpoint.createProducer();
- startService(producer, bean, beanName);
+ Producer producer = DeferServiceFactory.createProducer(endpoint);
return new UnitOfWorkProducer(producer);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 9ac71b6..f9cfceb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.naming.Context;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
@@ -176,8 +175,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
private final Map<String, Component> components = new HashMap<String, Component>();
private final Set<Route> routes = new LinkedHashSet<Route>();
- private final List<Service> servicesToClose = new CopyOnWriteArrayList<Service>();
+ private final List<Service> servicesToStop = new CopyOnWriteArrayList<Service>();
private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>();
+ private final DeferServiceStartupListener deferStartupListener = new DeferServiceStartupListener();
private TypeConverter typeConverter;
private TypeConverterRegistry typeConverterRegistry;
private Injector injector;
@@ -266,6 +266,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
// create endpoint registry at first since end users may access endpoints before CamelContext is started
this.endpoints = new DefaultEndpointRegistry(this);
+ // add the derfer service startup listener
+ this.startupListeners.add(deferStartupListener);
+
// use WebSphere specific resolver if running on WebSphere
if (WebSpherePackageScanClassResolver.isWebSphereClassLoader(this.getClass().getClassLoader())) {
log.info("Using WebSphere specific PackageScanClassResolver");
@@ -1054,11 +1057,11 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
addService(object, true);
}
- public void addService(Object object, boolean closeOnShutdown) throws Exception {
- doAddService(object, closeOnShutdown);
+ public void addService(Object object, boolean stopOnShutdown) throws Exception {
+ doAddService(object, stopOnShutdown);
}
- private void doAddService(Object object, boolean closeOnShutdown) throws Exception {
+ private void doAddService(Object object, boolean stopOnShutdown) throws Exception {
// inject CamelContext
if (object instanceof CamelContextAware) {
CamelContextAware aware = (CamelContextAware) object;
@@ -1085,9 +1088,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
// do not add endpoints as they have their own list
if (singleton && !(service instanceof Endpoint)) {
- // only add to list of services to close if its not already there
- if (closeOnShutdown && !hasService(service)) {
- servicesToClose.add(service);
+ // only add to list of services to stop if its not already there
+ if (stopOnShutdown && !hasService(service)) {
+ servicesToStop.add(service);
}
}
}
@@ -1110,7 +1113,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
for (LifecycleStrategy strategy : lifecycleStrategies) {
strategy.onServiceRemove(this, service, null);
}
- return servicesToClose.remove(service);
+ return servicesToStop.remove(service);
}
return false;
}
@@ -1118,14 +1121,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
public boolean hasService(Object object) {
if (object instanceof Service) {
Service service = (Service) object;
- return servicesToClose.contains(service);
+ return servicesToStop.contains(service);
}
return false;
}
@Override
public <T> T hasService(Class<T> type) {
- for (Service service : servicesToClose) {
+ for (Service service : servicesToStop) {
if (type.isInstance(service)) {
return type.cast(service);
}
@@ -1133,6 +1136,32 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
return null;
}
+ public void deferStartService(Object object, boolean stopOnShutdown) throws Exception {
+ if (object instanceof Service) {
+ Service service = (Service) object;
+
+ // only add to services to close if its a singleton
+ // otherwise we could for example end up with a lot of prototype scope endpoints
+ boolean singleton = true; // assume singleton by default
+ if (object instanceof IsSingleton) {
+ singleton = ((IsSingleton) service).isSingleton();
+ }
+ // do not add endpoints as they have their own list
+ if (singleton && !(service instanceof Endpoint)) {
+ // only add to list of services to stop if its not already there
+ if (stopOnShutdown && !hasService(service)) {
+ servicesToStop.add(service);
+ }
+ }
+ // are we already started?
+ if (isStarted()) {
+ ServiceHelper.startService(service);
+ } else {
+ deferStartupListener.addService(service);
+ }
+ }
+ }
+
public void addStartupListener(StartupListener listener) throws Exception {
// either add to listener so we can invoke then later when CamelContext has been started
// or invoke the callback right now
@@ -2680,7 +2709,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
// stop consumers from the services to close first, such as POJO consumer (eg @Consumer)
// which we need to stop after the routes, as a POJO consumer is essentially a route also
- for (Service service : servicesToClose) {
+ for (Service service : servicesToStop) {
if (service instanceof Consumer) {
shutdownServices(service);
}
@@ -2716,8 +2745,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
// shutdown services as late as possible
- shutdownServices(servicesToClose);
- servicesToClose.clear();
+ shutdownServices(servicesToStop);
+ servicesToStop.clear();
// must notify that we are stopped before stopping the management strategy
EventHelper.notifyCamelContextStopped(this);
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 6a7cfd4..aa3fa10 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -736,6 +736,12 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
}
producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
}
+
+ // need to lookup default endpoint as it may have been intercepted
+ if (defaultEndpoint != null) {
+ defaultEndpoint = camelContext.getEndpoint(defaultEndpoint.getEndpointUri());
+ }
+
ServiceHelper.startService(producerCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
new file mode 100644
index 0000000..0ddf1bf
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link Producer} that defers being started, until {@link org.apache.camel.CamelContext} has been started, this
+ * ensures that the producer is able to adapt to changes that may otherwise occur during starting
+ * CamelContext. If we do not defer starting the producer it may not adapt to those changes, and
+ * send messages to wrong endpoints.
+ */
+public class DeferProducer extends org.apache.camel.support.ServiceSupport implements Producer, AsyncProcessor {
+
+ private Producer delegate;
+ private final Endpoint endpoint;
+
+ public DeferProducer(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public Exchange createExchange() {
+ if (delegate == null) {
+ throw new IllegalStateException("Not started");
+ }
+ return delegate.createExchange();
+ }
+
+ @Override
+ public Exchange createExchange(ExchangePattern pattern) {
+ if (delegate == null) {
+ throw new IllegalStateException("Not started");
+ }
+ return delegate.createExchange(pattern);
+ }
+
+ @Override
+ @Deprecated
+ public Exchange createExchange(Exchange exchange) {
+ if (delegate == null) {
+ throw new IllegalStateException("Not started");
+ }
+ return delegate.createExchange(exchange);
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (delegate == null) {
+ throw new IllegalStateException("Not started");
+ }
+ delegate.process(exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (delegate == null) {
+ exchange.setException(new IllegalStateException("Not started"));
+ callback.done(true);
+ return true;
+ }
+
+ if (delegate instanceof AsyncProcessor) {
+ return ((AsyncProcessor) delegate).process(exchange, callback);
+ }
+
+ // fallback to sync mode
+ try {
+ process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ callback.done(true);
+ return true;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // need to lookup endpoint again as it may be intercepted
+ Endpoint lookup = endpoint.getCamelContext().getEndpoint(endpoint.getEndpointUri());
+
+ delegate = lookup.createProducer();
+ ServiceHelper.startService(delegate);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(delegate);
+ }
+
+ @Override
+ public boolean isSingleton() {
+ if (delegate != null) {
+ return delegate.isSingleton();
+ } else {
+ // assume singleton by default
+ return true;
+ }
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ if (delegate != null) {
+ return delegate.getEndpoint();
+ } else {
+ return endpoint;
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (delegate != null) {
+ return delegate.toString();
+ } else {
+ return "DelegateProducer[" + endpoint + "]";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
new file mode 100644
index 0000000..a78bdd8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.apache.camel.StartupListener;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.StartupListener} that defers starting {@link Service}s.
+ */
+public class DeferServiceStartupListener implements StartupListener {
+
+ private final Set<Service> services = new CopyOnWriteArraySet<Service>();
+
+ public void addService(Service service) {
+ services.add(service);
+ }
+
+ @Override
+ public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
+ for (Service service : services) {
+ ServiceHelper.startService(service);
+ }
+ services.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
new file mode 100644
index 0000000..9dac0dd
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DeferProducer;
+
+/**
+ * Factory to create {@link org.apache.camel.DeferStartService} services such as {@link Producer}s
+ * and {@link org.apache.camel.PollingConsumer}s
+ */
+public class DeferServiceFactory {
+
+ /**
+ * Creates the {@link Producer} which is deferred started until {@link org.apache.camel.CamelContext} is being started.
+ * <p/>
+ * When the producer is started, it re-lookup the endpoint to capture any changes such as the endpoint has been intercepted.
+ * This allows the producer to react and send messages to the updated endpoint.
+ *
+ * @param endpoint the endpoint
+ * @return the producer which will be deferred started until {@link org.apache.camel.CamelContext} has been started
+ * @throws Exception can be thrown if there is an error starting the producer
+ * @see org.apache.camel.impl.DeferProducer
+ */
+ public static Producer createProducer(Endpoint endpoint) throws Exception {
+ Producer producer = new DeferProducer(endpoint);
+ endpoint.getCamelContext().deferStartService(producer, true);
+ return producer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
new file mode 100644
index 0000000..553d28e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class PojoProduceInterceptEndpointTest extends TestCase {
+
+ public void testPojoProduceInterceptAlreadyStarted() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ interceptSendToEndpoint("direct:start")
+ .to("language:simple:${body}${body}");
+
+ from("direct:start")
+ .to("mock:result");
+ }
+ });
+
+ // start Camel before POJO being injected
+ context.start();
+
+ // use the injector (will use the default)
+ // which should post process the bean to inject the @Produce
+ MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+ MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+ mock.expectedBodiesReceived("WorldWorld");
+
+ Object reply = bean.doSomething("World");
+ assertEquals("WorldWorld", reply);
+
+ mock.assertIsSatisfied();
+
+ context.stop();
+ }
+
+ public void testPojoProduceInterceptNotStarted() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ interceptSendToEndpoint("direct:start")
+ .to("language:simple:${body}${body}");
+
+ from("direct:start")
+ .to("mock:result");
+ }
+ });
+
+ // use the injector (will use the default)
+ // which should post process the bean to inject the @Produce
+ MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+ // do NOT start Camel before POJO being injected
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+ mock.expectedBodiesReceived("WorldWorld");
+
+ Object reply = bean.doSomething("World");
+ assertEquals("WorldWorld", reply);
+
+ mock.assertIsSatisfied();
+
+ context.stop();
+ }
+
+ public static class MyBean {
+
+ @Produce(uri = "direct:start")
+ Producer producer;
+
+ public Object doSomething(String body) throws Exception {
+ Exchange exchange = producer.createExchange();
+ exchange.getIn().setBody(body);
+ producer.process(exchange);
+ return exchange.hasOut() ? exchange.getOut().getBody() : exchange.getIn().getBody();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
new file mode 100644
index 0000000..f03ec9d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class PojoProduceProxyInterceptEndpointTest extends TestCase {
+
+ public void testPojoProduceInterceptAlreadyStarted() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ interceptSendToEndpoint("direct:start")
+ .to("language:simple:${body}${body}");
+
+ from("direct:start")
+ .to("mock:result");
+ }
+ });
+
+ // start Camel before POJO being injected
+ context.start();
+
+ // use the injector (will use the default)
+ // which should post process the bean to inject the @Produce
+ MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+ MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+ mock.expectedBodiesReceived("WorldWorld");
+
+ Object reply = bean.doSomething("World");
+ assertEquals("WorldWorld", reply);
+
+ mock.assertIsSatisfied();
+
+ context.stop();
+ }
+
+ public void testPojoProduceInterceptNotStarted() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ interceptSendToEndpoint("direct:start")
+ .to("language:simple:${body}${body}");
+
+ from("direct:start")
+ .to("mock:result");
+ }
+ });
+
+ // use the injector (will use the default)
+ // which should post process the bean to inject the @Produce
+ MyBean bean = context.getInjector().newInstance(MyBean.class);
+
+ // do NOT start Camel before POJO being injected
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
+ mock.expectedBodiesReceived("WorldWorld");
+
+ Object reply = bean.doSomething("World");
+ assertEquals("WorldWorld", reply);
+
+ mock.assertIsSatisfied();
+
+ context.stop();
+ }
+
+ public static interface EchoService {
+ public String echo(String word);
+ }
+
+ public static class MyBean {
+
+ @Produce(uri = "direct:start")
+ private EchoService echo;
+
+ public Object doSomething(String body) throws Exception {
+ return echo.echo(body);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
index cd44d85..475c057 100644
--- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
+++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java
@@ -928,11 +928,13 @@ public class CamelNamespaceHandler implements NamespaceHandler {
@Override
protected boolean isSingleton(Object bean, String beanName) {
- ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName);
- if (meta != null && meta instanceof BeanMetadata) {
- String scope = ((BeanMetadata) meta).getScope();
- if (scope != null) {
- return BeanMetadata.SCOPE_SINGLETON.equals(scope);
+ if (beanName != null) {
+ ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName);
+ if (meta != null && meta instanceof BeanMetadata) {
+ String scope = ((BeanMetadata) meta).getScope();
+ if (scope != null) {
+ return BeanMetadata.SCOPE_SINGLETON.equals(scope);
+ }
}
}
// fallback to super, which will assume singleton