You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/10 16:20:57 UTC
[1/2] camel git commit: camel-rx - Should shutdown observcers when
CamelContext is being shutdown.
Repository: camel
Updated Branches:
refs/heads/master 83383c631 -> 241c98b66
camel-rx - Should shutdown observcers when CamelContext is being shutdown.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0d809d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0d809d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0d809d2
Branch: refs/heads/master
Commit: d0d809d25f15aa716ac21ee606009289c684cb2f
Parents: 83383c6
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 10 10:19:16 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 10 10:30:18 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/rx/ReactiveCamel.java | 23 ++++----
.../camel/rx/support/EndpointSubscription.java | 19 +++++-
.../apache/camel/rx/support/ObserverSender.java | 17 +++++-
.../rx/support/ReactiveBeanPostProcessor.java | 48 +++++++++++++++
.../camel/rx/support/ReactiveInjector.java | 61 ++++++++++++++++++++
.../rx/support/ReactivePostProcessorHelper.java | 47 +++++++++++++++
.../java/org/apache/camel/rx/LoginService.java | 34 +++++++++++
.../camel/rx/LoginServiceObservableTest.java | 46 +++++++++++++++
8 files changed, 280 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index beb3bf1..b0877f8 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -39,6 +39,14 @@ public class ReactiveCamel {
this.camelContext = camelContext;
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public Endpoint endpoint(String endpointUri) {
+ return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
+ }
+
/**
* Returns an {@link rx.Observable < org.apache.camel.Message >} to allow the messages sent on the endpoint
* to be processed using <a href="https://rx.codeplex.com/">Reactive Extensions</a>
@@ -79,6 +87,7 @@ public class ReactiveCamel {
public <T> void sendTo(Observable<T> observable, String endpointUri) {
sendTo(observable, endpoint(endpointUri));
}
+
/**
* Sends events on the given {@link Observable} to the given camel endpoint
*/
@@ -119,20 +128,12 @@ public class ReactiveCamel {
return new CamelOperator(endpoint);
}
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- public Endpoint endpoint(String endpointUri) {
- return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
- }
-
/**
* Returns a newly created {@link Observable} given a function which converts
* the {@link Exchange} from the Camel consumer to the required type
*/
- protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
- final Func1<Exchange, T> converter) {
+ private <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
+ final Func1<Exchange, T> converter) {
Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(endpoint, converter);
return new EndpointObservable<T>(endpoint, func);
}
@@ -140,7 +141,7 @@ public class ReactiveCamel {
/**
* Return a newly created {@link Observable} without conversion
*/
- protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
+ private Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(endpoint, exchange -> exchange));
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 593e1d4..0b2b6df 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +34,7 @@ import rx.functions.Func1;
/**
* An RX {@link Subscription} on a Camel {@link Endpoint}
*/
-public class EndpointSubscription<T> implements Subscription {
+public class EndpointSubscription<T> extends ServiceSupport implements Subscription {
private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class);
@@ -53,8 +54,11 @@ public class EndpointSubscription<T> implements Subscription {
CamelInternalProcessor internal = new CamelInternalProcessor(processor);
internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
try {
+ // need to start endpoint before we create producer
+ ServiceHelper.startService(endpoint);
this.consumer = endpoint.createConsumer(internal);
- ServiceHelper.startService(consumer);
+ // add as service so we ensure it gets stopped when CamelContext stops
+ endpoint.getCamelContext().addService(consumer, true, true);
} catch (Exception e) {
observer.onError(e);
}
@@ -91,4 +95,15 @@ public class EndpointSubscription<T> implements Subscription {
return observer;
}
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(consumer);
+ unsubscribed.set(false);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(consumer);
+ unsubscribed.set(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index d134aaa..187c2db 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -20,6 +20,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.processor.UnitOfWorkProducer;
+import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,14 +29,17 @@ import rx.Observer;
/**
* An {@link Observer} which sends events to a given {@link Endpoint}
*/
-public class ObserverSender<T> implements Observer<T> {
+public class ObserverSender<T> extends ServiceSupport implements Observer<T> {
private static final Logger LOG = LoggerFactory.getLogger(ObserverSender.class);
private Producer producer;
public ObserverSender(Endpoint endpoint) throws Exception {
+ // need to start endpoint before we create producer
+ ServiceHelper.startService(endpoint);
this.producer = new UnitOfWorkProducer(endpoint.createProducer());
- ServiceHelper.startService(producer);
+ // add as service so we ensure it gets stopped when CamelContext stops
+ endpoint.getCamelContext().addService(producer, true, true);
}
@Override
@@ -73,4 +77,13 @@ public class ObserverSender<T> implements Observer<T> {
}
}
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(producer);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(producer);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
new file mode 100644
index 0000000..a0d132e
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.CamelPostProcessorHelper;
+import org.apache.camel.impl.DefaultCamelBeanPostProcessor;
+
+public class ReactiveBeanPostProcessor extends DefaultCamelBeanPostProcessor {
+
+ private ReactivePostProcessorHelper helper;
+
+ public ReactiveBeanPostProcessor(CamelContext camelContext) {
+ super(camelContext);
+ }
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws Exception {
+ return super.postProcessBeforeInitialization(bean, beanName);
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws Exception {
+ return super.postProcessAfterInitialization(bean, beanName);
+ }
+
+ @Override
+ public CamelPostProcessorHelper getPostProcessorHelper() {
+ if (helper == null) {
+ helper = new ReactivePostProcessorHelper(getOrLookupCamelContext());
+ }
+ return helper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
new file mode 100644
index 0000000..adab01e
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Injector;
+import org.apache.camel.util.ReflectionInjector;
+
+public class ReactiveInjector implements Injector {
+
+ // use the reflection injector
+ private final Injector delegate = new ReflectionInjector();
+ private final ReactiveBeanPostProcessor postProcessor;
+
+ public ReactiveInjector(CamelContext context) {
+ postProcessor = new ReactiveBeanPostProcessor(context);
+ }
+
+ @Override
+ public <T> T newInstance(Class<T> type) {
+ T answer = delegate.newInstance(type);
+ if (answer != null) {
+ try {
+ postProcessor.postProcessBeforeInitialization(answer, answer.getClass().getName());
+ postProcessor.postProcessAfterInitialization(answer, answer.getClass().getName());
+ } catch (Exception e) {
+ throw new RuntimeCamelException("Error during post processing of bean " + answer, e);
+ }
+ }
+ return answer;
+ }
+
+ @Override
+ public <T> T newInstance(Class<T> type, Object instance) {
+ T answer = delegate.newInstance(type, instance);
+ if (answer != null) {
+ try {
+ postProcessor.postProcessBeforeInitialization(answer, answer.getClass().getName());
+ postProcessor.postProcessAfterInitialization(answer, answer.getClass().getName());
+ } catch (Exception e) {
+ throw new RuntimeCamelException("Error during post processing of bean " + answer, e);
+ }
+ }
+ return answer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
new file mode 100644
index 0000000..88a94ae
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NoSuchBeanException;
+import org.apache.camel.impl.CamelPostProcessorHelper;
+
+public class ReactivePostProcessorHelper extends CamelPostProcessorHelper {
+
+ public ReactivePostProcessorHelper(CamelContext camelContext) {
+ super(camelContext);
+ }
+
+ @Override
+ public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty,
+ String injectionPointName, Object bean, String beanName) {
+ return super.getInjectionValue(type, endpointUri, endpointRef, endpointProperty, injectionPointName, bean, beanName);
+ }
+
+ @Override
+ public Object getInjectionBeanValue(Class<?> type, String name) {
+ try {
+ return super.getInjectionBeanValue(type, name);
+ } catch (NoSuchBeanException e) {
+ // ignore
+ }
+
+ // lets build a proxy
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
new file mode 100644
index 0000000..0987377
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Consume;
+import rx.Observable;
+
+public class LoginService {
+
+ @Consume(uri = "seda:login")
+ public Observable<String> login() {
+ return null;
+ }
+
+ @Consume(uri = "seda:user")
+ public Observable<String> getUserState() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
new file mode 100644
index 0000000..229a6c7
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import rx.Observable;
+
+public class LoginServiceObservableTest extends CamelTestSupport {
+
+ @Test
+ public void testBeanObservable() throws Exception {
+ ReactiveCamel reactiveCamel = new ReactiveCamel(context);
+
+ // consume from two endpoints and aggregate by appending the data
+ Observable<String> login = reactiveCamel.toObservable("seda:login", String.class);
+ Observable<String> user = reactiveCamel.toObservable("seda:user", String.class);
+ Observable<String> result = Observable.combineLatest(login, user, (a, b) -> a + "=" + b);
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("OK=Donald Duck");
+
+ // send in data
+ template.sendBody("seda:login", "OK");
+ template.sendBody("seda:user", "Donald Duck");
+
+ // and send the results to the mock endpoint
+ reactiveCamel.sendTo(result, "mock:result");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
\ No newline at end of file
[2/2] camel git commit: CAMEL-7174: CacheManager should only be
shutdown when last endpoint is stopped. Thanks to metatech for the patch.
Posted by da...@apache.org.
CAMEL-7174: CacheManager should only be shutdown when last endpoint is stopped. Thanks to metatech for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/241c98b6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/241c98b6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/241c98b6
Branch: refs/heads/master
Commit: 241c98b66583de92a3c85b6bbe9290903fdf7dc6
Parents: d0d809d
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 10 16:20:39 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 10 16:20:39 2016 +0200
----------------------------------------------------------------------
.../camel/component/cache/CacheManagerFactory.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/241c98b6/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java b/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
index 1a69a7c..5ab229b 100755
--- a/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
+++ b/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
@@ -63,11 +63,18 @@ public abstract class CacheManagerFactory extends ServiceSupport {
}
@Override
- protected void doStop() throws Exception {
- // shutdown cache manager when stopping
+ protected synchronized void doStop() throws Exception {
+ // only shutdown cache manager if no longer in use
+ // (it may be reused when running in app servers like Karaf)
if (cacheManager != null) {
- cacheManager.shutdown();
- cacheManager = null;
+ int size = cacheManager.getCacheNames().length;
+ if (size <= 0) {
+ LOG.info("Shutting down CacheManager as its no longer in use");
+ cacheManager.shutdown();
+ cacheManager = null;
+ } else {
+ LOG.info("Cannot stop CacheManager as its still in use by {} clients", size);
+ }
}
}
}