You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/04/26 12:57:04 UTC

camel git commit: CAMEL-11201: refactored service creation in spring-boot

Repository: camel
Updated Branches:
  refs/heads/master 57916bbe2 -> 367d8c68f


CAMEL-11201: refactored service creation in spring-boot


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/367d8c68
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/367d8c68
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/367d8c68

Branch: refs/heads/master
Commit: 367d8c68fa1f9fca2c28ecdef7bad5fe19bb748a
Parents: 57916bb
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Wed Apr 26 14:54:18 2017 +0200
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Wed Apr 26 14:54:31 2017 +0200

----------------------------------------------------------------------
 .../streams/api/CamelReactiveStreams.java       |  27 +----
 .../ReactiveStreamsServiceCreationHelper.java   |  58 ++++++++++
 ...ReactiveStreamsServiceAutoConfiguration.java |   5 +-
 .../ReactiveStreamsAutoConfigurationTest.java   | 106 +++++++++++++++++++
 4 files changed, 169 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/367d8c68/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
index 506ab10..8a2cd93 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
@@ -21,7 +21,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.FactoryFinder;
+import org.apache.camel.component.reactive.streams.util.ReactiveStreamsServiceCreationHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,7 +81,7 @@ public final class CamelReactiveStreams {
             service = context.getRegistry().lookupByNameAndType(serviceName, CamelReactiveStreamsService.class);
 
             if (service == null) {
-                service = resolveServiceUsingFactory(context, serviceName);
+                service = ReactiveStreamsServiceCreationHelper.createNewReactiveStreamsService(context, serviceName);
             }
         } else {
             Set<CamelReactiveStreamsService> set = context.getRegistry().findByType(CamelReactiveStreamsService.class);
@@ -91,32 +91,11 @@ public final class CamelReactiveStreams {
 
             if (service == null) {
                 LOG.info("Using default reactive stream service");
-                service = resolveServiceUsingFactory(context, null);
+                service = ReactiveStreamsServiceCreationHelper.createNewReactiveStreamsService(context, null);
             }
         }
 
         return service;
     }
 
-    @SuppressWarnings("unchecked")
-    private static CamelReactiveStreamsService resolveServiceUsingFactory(CamelContext context, String name) {
-        if (name == null) {
-            name = "default-service";
-        }
-
-        String path = "META-INF/services/org/apache/camel/reactive-streams/";
-        Class<? extends CamelReactiveStreamsService> serviceClass = null;
-        try {
-            FactoryFinder finder = context.getFactoryFinder(path);
-            LOG.trace("Using FactoryFinder: {}", finder);
-            serviceClass = (Class<? extends CamelReactiveStreamsService>) finder.findClass(name);
-            return serviceClass.newInstance();
-        } catch (ClassNotFoundException e) {
-            throw new IllegalStateException("Class referenced in '" + path + name + "' not found", e);
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to create the reactive stream service defined in '" + path + name + "'", e);
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/367d8c68/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ReactiveStreamsServiceCreationHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ReactiveStreamsServiceCreationHelper.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ReactiveStreamsServiceCreationHelper.java
new file mode 100644
index 0000000..accd965
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ReactiveStreamsServiceCreationHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.spi.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility class to create the reactive-streams service from factory finders.
+ * Users should not use this class directly, as it may be removed in future versions.
+ */
+public final class ReactiveStreamsServiceCreationHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelReactiveStreams.class);
+
+    private ReactiveStreamsServiceCreationHelper() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static CamelReactiveStreamsService createNewReactiveStreamsService(CamelContext context, String name) {
+        if (name == null) {
+            name = "default-service";
+        }
+
+        String path = "META-INF/services/org/apache/camel/reactive-streams/";
+        Class<? extends CamelReactiveStreamsService> serviceClass;
+        try {
+            FactoryFinder finder = context.getFactoryFinder(path);
+            LOG.trace("Using FactoryFinder: {}", finder);
+            serviceClass = (Class<? extends CamelReactiveStreamsService>) finder.findClass(name);
+            return serviceClass.newInstance();
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Class referenced in '" + path + name + "' not found", e);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create the reactive stream service defined in '" + path + name + "'", e);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/367d8c68/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsServiceAutoConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsServiceAutoConfiguration.java
index f9430cc..ec0cc7a 100644
--- a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsServiceAutoConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsServiceAutoConfiguration.java
@@ -17,9 +17,8 @@
 package org.apache.camel.component.reactive.streams.springboot;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.util.ReactiveStreamsServiceCreationHelper;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -37,7 +36,7 @@ public class ReactiveStreamsServiceAutoConfiguration {
     @ConditionalOnMissingBean
     @ConditionalOnClass(CamelContext.class)
     public CamelReactiveStreamsService camelReactiveStreamsService(CamelContext context) {
-        return CamelReactiveStreams.get(context);
+        return ReactiveStreamsServiceCreationHelper.createNewReactiveStreamsService(context, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/367d8c68/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
new file mode 100644
index 0000000..aadcc37
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.springboot.test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsComponentAutoConfiguration;
+import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsServiceAutoConfiguration;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.stereotype.Component;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Testing the servlet mapping
+ */
+@RunWith(SpringRunner.class)
+@SpringBootApplication
+@DirtiesContext
+@ContextConfiguration(classes = {ReactiveStreamsServiceAutoConfiguration.class, ReactiveStreamsComponentAutoConfiguration.class, CamelAutoConfiguration.class})
+@SpringBootTest
+public class ReactiveStreamsAutoConfigurationTest {
+
+    @Autowired
+    private CamelContext context;
+
+    @Test
+    public void testStreamFlow() throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        String[] res = new String[1];
+        Throwable[] error = new Throwable[1];
+        Publisher<String> string = CamelReactiveStreams.get(context).fromStream("stream", String.class);
+        string.subscribe(new Subscriber<String>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(100);
+            }
+
+            @Override
+            public void onNext(String s) {
+                res[0] = s;
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                error[0] = throwable;
+            }
+
+            @Override
+            public void onComplete() {
+            }
+        });
+
+        context.createFluentProducerTemplate().to("direct:endpoint").withBody("Hello").send();
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+        assertEquals("Hello", res[0]);
+        Thread.sleep(100);
+        assertNull(error[0]);
+    }
+
+    @Component
+    static class Routes extends RouteBuilder {
+        @Override
+        public void configure() throws Exception {
+
+            from("direct:endpoint")
+                    .to("reactive-streams:stream");
+
+        }
+    }
+
+}
+