You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/10 16:20:57 UTC

[1/2] camel git commit: camel-rx - Should shutdown observcers when CamelContext is being shutdown.

Repository: camel
Updated Branches:
  refs/heads/master 83383c631 -> 241c98b66


camel-rx - Should shutdown observcers when CamelContext is being shutdown.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0d809d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0d809d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0d809d2

Branch: refs/heads/master
Commit: d0d809d25f15aa716ac21ee606009289c684cb2f
Parents: 83383c6
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 10 10:19:16 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 10 10:30:18 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/ReactiveCamel.java | 23 ++++----
 .../camel/rx/support/EndpointSubscription.java  | 19 +++++-
 .../apache/camel/rx/support/ObserverSender.java | 17 +++++-
 .../rx/support/ReactiveBeanPostProcessor.java   | 48 +++++++++++++++
 .../camel/rx/support/ReactiveInjector.java      | 61 ++++++++++++++++++++
 .../rx/support/ReactivePostProcessorHelper.java | 47 +++++++++++++++
 .../java/org/apache/camel/rx/LoginService.java  | 34 +++++++++++
 .../camel/rx/LoginServiceObservableTest.java    | 46 +++++++++++++++
 8 files changed, 280 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index beb3bf1..b0877f8 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -39,6 +39,14 @@ public class ReactiveCamel {
         this.camelContext = camelContext;
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public Endpoint endpoint(String endpointUri) {
+        return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
+    }
+
     /**
      * Returns an {@link rx.Observable < org.apache.camel.Message >} to allow the messages sent on the endpoint
      * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
@@ -79,6 +87,7 @@ public class ReactiveCamel {
     public <T> void sendTo(Observable<T> observable, String endpointUri) {
         sendTo(observable, endpoint(endpointUri));
     }
+
     /**
      * Sends events on the given {@link Observable} to the given camel endpoint
      */
@@ -119,20 +128,12 @@ public class ReactiveCamel {
         return new CamelOperator(endpoint);
     }
 
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    public Endpoint endpoint(String endpointUri) {
-        return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
-    }
-
     /**
      * Returns a newly created {@link Observable} given a function which converts
      * the {@link Exchange} from the Camel consumer to the required type
      */
-    protected <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
-                                                         final Func1<Exchange, T> converter) {
+    private <T> Observable<T> createEndpointObservable(final Endpoint endpoint,
+                                                       final Func1<Exchange, T> converter) {
         Observable.OnSubscribe<T> func = new EndpointSubscribeFunc<T>(endpoint, converter);
         return new EndpointObservable<T>(endpoint, func);
     }
@@ -140,7 +141,7 @@ public class ReactiveCamel {
     /**
      * Return a newly created {@link Observable} without conversion
      */
-    protected Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
+    private Observable<Exchange> createEndpointObservable(final Endpoint endpoint) {
         return new EndpointObservable<Exchange>(endpoint, new EndpointSubscribeFunc<Exchange>(endpoint, exchange -> exchange));
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
index 593e1d4..0b2b6df 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,7 @@ import rx.functions.Func1;
 /**
  * An RX {@link Subscription} on a Camel {@link Endpoint}
  */
-public class EndpointSubscription<T> implements Subscription {
+public class EndpointSubscription<T> extends ServiceSupport implements Subscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class);
 
@@ -53,8 +54,11 @@ public class EndpointSubscription<T> implements Subscription {
         CamelInternalProcessor internal = new CamelInternalProcessor(processor);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
         try {
+            // need to start endpoint before we create producer
+            ServiceHelper.startService(endpoint);
             this.consumer = endpoint.createConsumer(internal);
-            ServiceHelper.startService(consumer);
+            // add as service so we ensure it gets stopped when CamelContext stops
+            endpoint.getCamelContext().addService(consumer, true, true);
         } catch (Exception e) {
             observer.onError(e);
         }
@@ -91,4 +95,15 @@ public class EndpointSubscription<T> implements Subscription {
         return observer;
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(consumer);
+        unsubscribed.set(false);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(consumer);
+        unsubscribed.set(true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
index d134aaa..187c2db 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
@@ -20,6 +20,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
 import org.apache.camel.processor.UnitOfWorkProducer;
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,14 +29,17 @@ import rx.Observer;
 /**
  * An {@link Observer} which sends events to a given {@link Endpoint}
  */
-public class ObserverSender<T> implements Observer<T> {
+public class ObserverSender<T> extends ServiceSupport implements Observer<T> {
     private static final Logger LOG = LoggerFactory.getLogger(ObserverSender.class);
 
     private Producer producer;
 
     public ObserverSender(Endpoint endpoint) throws Exception {
+        // need to start endpoint before we create producer
+        ServiceHelper.startService(endpoint);
         this.producer = new UnitOfWorkProducer(endpoint.createProducer());
-        ServiceHelper.startService(producer);
+        // add as service so we ensure it gets stopped when CamelContext stops
+        endpoint.getCamelContext().addService(producer, true, true);
     }
 
     @Override
@@ -73,4 +77,13 @@ public class ObserverSender<T> implements Observer<T> {
         }
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(producer);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(producer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
new file mode 100644
index 0000000..a0d132e
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveBeanPostProcessor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.CamelPostProcessorHelper;
+import org.apache.camel.impl.DefaultCamelBeanPostProcessor;
+
+public class ReactiveBeanPostProcessor extends DefaultCamelBeanPostProcessor {
+
+    private ReactivePostProcessorHelper helper;
+
+    public ReactiveBeanPostProcessor(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    public Object postProcessBeforeInitialization(Object bean, String beanName) throws Exception {
+        return super.postProcessBeforeInitialization(bean, beanName);
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws Exception {
+        return super.postProcessAfterInitialization(bean, beanName);
+    }
+
+    @Override
+    public CamelPostProcessorHelper getPostProcessorHelper() {
+        if (helper == null) {
+            helper = new ReactivePostProcessorHelper(getOrLookupCamelContext());
+        }
+        return helper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
new file mode 100644
index 0000000..adab01e
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactiveInjector.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Injector;
+import org.apache.camel.util.ReflectionInjector;
+
+public class ReactiveInjector implements Injector {
+
+    // use the reflection injector
+    private final Injector delegate = new ReflectionInjector();
+    private final ReactiveBeanPostProcessor postProcessor;
+
+    public ReactiveInjector(CamelContext context) {
+        postProcessor = new ReactiveBeanPostProcessor(context);
+    }
+
+    @Override
+    public <T> T newInstance(Class<T> type) {
+        T answer = delegate.newInstance(type);
+        if (answer != null) {
+            try {
+                postProcessor.postProcessBeforeInitialization(answer, answer.getClass().getName());
+                postProcessor.postProcessAfterInitialization(answer, answer.getClass().getName());
+            } catch (Exception e) {
+                throw new RuntimeCamelException("Error during post processing of bean " + answer, e);
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    public <T> T newInstance(Class<T> type, Object instance) {
+        T answer = delegate.newInstance(type, instance);
+        if (answer != null) {
+            try {
+                postProcessor.postProcessBeforeInitialization(answer, answer.getClass().getName());
+                postProcessor.postProcessAfterInitialization(answer, answer.getClass().getName());
+            } catch (Exception e) {
+                throw new RuntimeCamelException("Error during post processing of bean " + answer, e);
+            }
+        }
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
new file mode 100644
index 0000000..88a94ae
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ReactivePostProcessorHelper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NoSuchBeanException;
+import org.apache.camel.impl.CamelPostProcessorHelper;
+
+public class ReactivePostProcessorHelper extends CamelPostProcessorHelper {
+
+    public ReactivePostProcessorHelper(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty,
+                                    String injectionPointName, Object bean, String beanName) {
+        return super.getInjectionValue(type, endpointUri, endpointRef, endpointProperty, injectionPointName, bean, beanName);
+    }
+
+    @Override
+    public Object getInjectionBeanValue(Class<?> type, String name) {
+        try {
+            return super.getInjectionBeanValue(type, name);
+        } catch (NoSuchBeanException e) {
+            // ignore
+        }
+
+        // lets build a proxy
+        return "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
new file mode 100644
index 0000000..0987377
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginService.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Consume;
+import rx.Observable;
+
+public class LoginService {
+
+    @Consume(uri = "seda:login")
+    public Observable<String> login() {
+        return null;
+    }
+
+    @Consume(uri = "seda:user")
+    public Observable<String> getUserState() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d0d809d2/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
new file mode 100644
index 0000000..229a6c7
--- /dev/null
+++ b/components/camel-rx/src/test/java/org/apache/camel/rx/LoginServiceObservableTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import rx.Observable;
+
+public class LoginServiceObservableTest extends CamelTestSupport {
+
+    @Test
+    public void testBeanObservable() throws Exception {
+        ReactiveCamel reactiveCamel = new ReactiveCamel(context);
+
+        // consume from two endpoints and aggregate by appending the data
+        Observable<String> login = reactiveCamel.toObservable("seda:login", String.class);
+        Observable<String> user = reactiveCamel.toObservable("seda:user", String.class);
+        Observable<String> result = Observable.combineLatest(login, user, (a, b) -> a + "=" + b);
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("OK=Donald Duck");
+
+        // send in data
+        template.sendBody("seda:login", "OK");
+        template.sendBody("seda:user", "Donald Duck");
+
+        // and send the results to the mock endpoint
+        reactiveCamel.sendTo(result, "mock:result");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file


[2/2] camel git commit: CAMEL-7174: CacheManager should only be shutdown when last endpoint is stopped. Thanks to metatech for the patch.

Posted by da...@apache.org.
CAMEL-7174: CacheManager should only be shutdown when last endpoint is stopped. Thanks to metatech for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/241c98b6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/241c98b6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/241c98b6

Branch: refs/heads/master
Commit: 241c98b66583de92a3c85b6bbe9290903fdf7dc6
Parents: d0d809d
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 10 16:20:39 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 10 16:20:39 2016 +0200

----------------------------------------------------------------------
 .../camel/component/cache/CacheManagerFactory.java   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/241c98b6/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java b/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
index 1a69a7c..5ab229b 100755
--- a/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
+++ b/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheManagerFactory.java
@@ -63,11 +63,18 @@ public abstract class CacheManagerFactory extends ServiceSupport {
     }
 
     @Override
-    protected void doStop() throws Exception {
-        // shutdown cache manager when stopping
+    protected synchronized void doStop() throws Exception {
+        // only shutdown cache manager if no longer in use
+        // (it may be reused when running in app servers like Karaf)
         if (cacheManager != null) {
-            cacheManager.shutdown();
-            cacheManager = null;
+            int size = cacheManager.getCacheNames().length;
+            if (size <= 0) {
+                LOG.info("Shutting down CacheManager as its no longer in use");
+                cacheManager.shutdown();
+                cacheManager = null;
+            } else {
+                LOG.info("Cannot stop CacheManager as its still in use by {} clients", size);
+            }
         }
     }
 }