You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2010/04/21 09:36:22 UTC

svn commit: r936181 - in /servicemix/smx4/nmr/trunk/nmr: api/src/main/java/org/apache/servicemix/nmr/api/ core/src/main/java/org/apache/servicemix/nmr/core/ core/src/test/java/org/apache/servicemix/nmr/core/

Author: gertv
Date: Wed Apr 21 07:36:21 2010
New Revision: 936181

URL: http://svn.apache.org/viewvc?rev=936181&view=rev
Log:
SMX4NMR-189: Allow configuring injected Channel to bypass thread pool

Modified:
    servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/Endpoint.java
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ServiceMix.java
    servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/EndpointRegistryImplTest.java

Modified: servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/Endpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/Endpoint.java?rev=936181&r1=936180&r2=936181&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/Endpoint.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/Endpoint.java Wed Apr 21 07:36:21 2010
@@ -65,6 +65,13 @@ public interface Endpoint {
     String UNTARGETABLE = "UNTARGETABLE";
 
     /**
+     * If this property is set to <code>true</code>, the injected
+     * Channel will be configured to bypass the thread pool
+     * for Exchange delivery
+     */
+    String CHANNEL_SYNC_DELIVERY = "CHANNEL_SYNC_DELIVERY";
+
+    /**
      * Set the channel so that the endpoint can send exchanges back
      * when they are processed or act as a consumer itself.
      * This method will be called by the NMR while the endpoint is registered.

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java?rev=936181&r1=936180&r2=936181&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java Wed Apr 21 07:36:21 2010
@@ -121,9 +121,12 @@ public class EndpointRegistryImpl implem
             }
             name = EXECUTOR_PREFIX + name;
             Executor executor = executorFactory.createExecutor(name);
+
             // Create channel
             ChannelImpl channel = new ChannelImpl(wrapper, executor, nmr);
+            channel.setShouldRunSynchronously(isChannelSyncDelivery(properties));
             wrapper.setChannel(channel);
+            
             wrappers.put(wrapper, endpoint);
             registry.register(wrapper, properties);
             for (EndpointListener listener : nmr.getListenerRegistry().getListeners(EndpointListener.class)) {
@@ -137,6 +140,13 @@ public class EndpointRegistryImpl implem
         }
     }
 
+    /*
+     * Should the Channel use sync delivery?
+     */
+    private boolean isChannelSyncDelivery(Map<String, ?> properties) {
+        return Boolean.TRUE.equals(properties.get(Endpoint.CHANNEL_SYNC_DELIVERY));
+    }
+
     /**
      * Unregister a previously register enpoint.
      * In an OSGi world, this would be performed automatically by a ServiceTracker.

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ServiceMix.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ServiceMix.java?rev=936181&r1=936180&r2=936181&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ServiceMix.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ServiceMix.java Wed Apr 21 07:36:21 2010
@@ -52,7 +52,7 @@ public class ServiceMix implements NMR {
      */
     public void init() {
         if (executorFactory == null) {
-            executorFactory = new ExecutorFactoryImpl();
+            createExecutorFactory();
         }
         if (endpoints == null) {
             EndpointRegistryImpl reg = new EndpointRegistryImpl(this);
@@ -173,7 +173,7 @@ public class ServiceMix implements NMR {
 
     /**
      * Set the wire registry
-     * 
+     *
      * @param wires the wire registry
      */
     public void setWireRegistry(WireRegistry wires) {
@@ -188,10 +188,10 @@ public class ServiceMix implements NMR {
     public ManagementStrategy getManagementStrategy() {
         return managementStrategy;
     }
- 
+
     /**
      * Set the management strategy
-     * 
+     *
      * @param managementStrategy the management strategy
      */
     public void setManagementStrategy(ManagementStrategy managementStrategy) {
@@ -206,21 +206,30 @@ public class ServiceMix implements NMR {
     public String getId() {
         return id;
     }
- 
+
     /**
      * Set the NMR id
-     * 
+     *
      * @param id the NMR id
      */
     public void setId(String id) {
         this.id = id;
     }
 
+    /*
+     * Create the default ExecutorFactory
+     */
+    private void createExecutorFactory() {
+        ExecutorFactoryImpl impl = new ExecutorFactoryImpl();
+        impl.getDefaultConfig().setBypassIfSynchronous(true);
+        executorFactory = impl;
+    }
+
     private void fireEvent(EventObject event) {
-        try { 
+        try {
             getManagementStrategy().notify(event);
         } catch (Exception e) {
             // ignore
-        }  
+        }
     }
 }

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/EndpointRegistryImplTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/EndpointRegistryImplTest.java?rev=936181&r1=936180&r2=936181&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/EndpointRegistryImplTest.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/EndpointRegistryImplTest.java Wed Apr 21 07:36:21 2010
@@ -75,6 +75,17 @@ public class EndpointRegistryImplTest ex
         assertNotNull(r.choose(registry));
         assertFalse(r.choose(registry).iterator().hasNext());
     }
+    
+    public void testRegisterWithSyncChannel() throws Exception {
+        DummyEndpoint endpoint = new DummyEndpoint();
+
+        Map<String, Object> properties = ServiceHelper.createMap(Endpoint.NAME, "endpoint-id");
+        properties.put(Endpoint.CHANNEL_SYNC_DELIVERY, true);
+
+        registry.register(endpoint, properties);
+        assertTrue("Injected channel should have shouldRunSynchronously enabled",
+                   endpoint.channel.isShouldRunSynchronously());
+    }
 
     public void testLdapFilter() throws Exception {
         System.setProperty("org.osgi.vendor.framework", "org.apache.servicemix.nmr.core");
@@ -174,7 +185,10 @@ public class EndpointRegistryImplTest ex
     }
 
     protected static class DummyEndpoint implements Endpoint {
+        private ChannelImpl channel;
+
         public void setChannel(Channel channel) {
+            this.channel = (ChannelImpl) channel;
         }
         public void process(Exchange exchange) {
         }