You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/28 14:55:45 UTC

svn commit: r570401 - in /incubator/servicemix/branches/servicemix-4.0: ./ api/ api/src/main/java/org/apache/servicemix/api/ api/src/main/java/org/apache/servicemix/api/internal/ api/src/main/java/org/apache/servicemix/api/service/ core/ core/src/main/...

Author: gnodet
Date: Tue Aug 28 05:55:43 2007
New Revision: 570401

URL: http://svn.apache.org/viewvc?rev=570401&view=rev
Log:
Implement a simple channel and add an test end to end showing how to expose an endpoint and send an exchange to it

Added:
    incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java
    incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java
Modified:
    incubator/servicemix/branches/servicemix-4.0/   (props changed)
    incubator/servicemix/branches/servicemix-4.0/api/   (props changed)
    incubator/servicemix/branches/servicemix-4.0/api/pom.xml
    incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java
    incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java
    incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java
    incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java
    incubator/servicemix/branches/servicemix-4.0/core/   (props changed)
    incubator/servicemix/branches/servicemix-4.0/core/pom.xml
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java
    incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java
    incubator/servicemix/branches/servicemix-4.0/pom.xml

Propchange: incubator/servicemix/branches/servicemix-4.0/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+

Propchange: incubator/servicemix/branches/servicemix-4.0/api/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+

Modified: incubator/servicemix/branches/servicemix-4.0/api/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/pom.xml Tue Aug 28 05:55:43 2007
@@ -29,7 +29,7 @@
 
   <groupId>org.apache.servicemix</groupId>
   <artifactId>org.apache.servicemix.api</artifactId>
-  <packaging>jar</packaging>
+  <packaging>bundle</packaging>
   <version>4.0-SNAPSHOT</version>
   <name>org.apache.servicemix.api</name>
 
@@ -55,8 +55,9 @@
         <artifactId>maven-bundle-plugin</artifactId>
         <configuration>
           <instructions>
+            <Bundle-Name>${pom.name}</Bundle-Name>
             <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
-            <Export-Package>${pom.artifactId}</Export-Package>
+            <Export-Package>${pom.artifactId},${pom.artifactId}.service,${pom.artifactId}.event,${pom.artifactId}.internal</Export-Package>
             <DynamicImport-Package>*</DynamicImport-Package>
           </instructions>
         </configuration>

Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java Tue Aug 28 05:55:43 2007
@@ -31,6 +31,13 @@
 public interface Channel {
 
     /**
+     * Access to the bus
+     *
+     * @return the NMR
+     */
+    NMR getNMR();
+
+    /**
      * Creates a new exchange.
      *
      * @param pattern specify the InOnly / InOut / RobustInOnly / RobustInOut

Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java Tue Aug 28 05:55:43 2007
@@ -20,7 +20,12 @@
 import org.apache.servicemix.api.event.ListenerRegistry;
 
 /**
- * 
+ * The NMR interface is the primary interface to communicate with the Bus.
+ * It contains methods to access the registries and to create a client channel
+ * to communicate with the bus.
+ *
+ * @version $Revision: $
+ * @since 4.0
  */
 public interface NMR {
 

Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java Tue Aug 28 05:55:43 2007
@@ -24,6 +24,12 @@
  */
 public interface InternalEndpoint extends Endpoint {
 
+    /**
+     * Retrieve the channel associated with this endpoint.
+     * This method is usually used by {@link Flow}s to deliver
+     * exchanges to this endpoint.
+     * @return
+     */
     InternalChannel getChannel();
 
 }

Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java Tue Aug 28 05:55:43 2007
@@ -17,6 +17,9 @@
 package org.apache.servicemix.api.internal;
 
 import org.apache.servicemix.api.Exchange;
+import org.apache.servicemix.api.Role;
+
+import java.util.concurrent.Semaphore;
 
 /**
  * 
@@ -27,6 +30,15 @@
 public interface InternalExchange extends Exchange {
 
     /**
+     * Set the role of the exchange.
+     * This method is for internal use and will be called by the Channel
+     * when delivering the exchange.
+     *
+     * @param role the new role
+     */
+    public void setRole(Role role);
+
+    /**
      * Retrieve the source endpoint. I.e. the one that created the exchange.
      * This information will be set by the NMR when the exchange is sent
      * using one of {@link org.apache.servicemix.api.Channel#send(Exchange)},
@@ -61,4 +73,8 @@
      * @param destination the destination endpoint
      */
     void setDestination(InternalEndpoint destination);
+
+    Semaphore getConsumerLock(boolean create);
+
+    Semaphore getProviderLock(boolean create);
 }

Added: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java?rev=570401&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java Tue Aug 28 05:55:43 2007
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.api.service;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ *
+ */
+public final class ServiceHelper {
+
+    private ServiceHelper() {
+    }
+
+    public static Map<String, ?> createMap(String... data) {
+        Map<String, String> props = new HashMap<String, String>();
+        for (int i = 0; i < data.length / 2; i++) {
+            props.put(data[i*2], data[i*2+1]);
+        }
+        return props;
+    }
+
+}

Propchange: incubator/servicemix/branches/servicemix-4.0/core/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+

Modified: incubator/servicemix/branches/servicemix-4.0/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/pom.xml Tue Aug 28 05:55:43 2007
@@ -29,7 +29,7 @@
 
   <groupId>org.apache.servicemix</groupId>
   <artifactId>org.apache.servicemix.core</artifactId>
-  <packaging>jar</packaging>
+  <packaging>bundle</packaging>
   <version>4.0-SNAPSHOT</version>
   <name>org.apache.servicemix.core</name>
 

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java Tue Aug 28 05:55:43 2007
@@ -16,17 +16,22 @@
  */
 package org.apache.servicemix.core;
 
-import org.apache.servicemix.api.Exchange;
-import org.apache.servicemix.api.NMR;
-import org.apache.servicemix.api.Pattern;
+import org.apache.servicemix.api.*;
 import org.apache.servicemix.api.event.ExchangeListener;
 import org.apache.servicemix.api.internal.InternalChannel;
 import org.apache.servicemix.api.internal.InternalEndpoint;
 import org.apache.servicemix.api.internal.InternalExchange;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
+ * The Channel implementation.
+ * The channel uses an Executor (usually a thread pool)
+ * to delegate  
+ *
  * @version $Revision: $
  * @since 4.0
  */
@@ -35,7 +40,6 @@
     private final InternalEndpoint endpoint;
     private final Executor executor;
     private final NMR nmr;
-    
 
     public ChannelImpl(InternalEndpoint endpoint, Executor executor, NMR nmr) {
         this.endpoint = endpoint;
@@ -44,6 +48,15 @@
     }
 
     /**
+     * Access to the bus
+     *
+     * @return the NMR
+     */
+    public NMR getNMR() {
+        return nmr;
+    }
+
+    /**
      * Creates a new exchange.
      *
      * @param pattern specify the InOnly / InOut / RobustInOnly / RobustInOut
@@ -70,8 +83,7 @@
      * @return <code>true</code> if the exchange has been processed succesfully
      */
     public boolean sendSync(Exchange exchange) {
-        // TODO
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return sendSync(exchange, 0);
     }
 
     /**
@@ -82,8 +94,28 @@
      * @return <code>true</code> if the exchange has been processed succesfully
      */
     public boolean sendSync(Exchange exchange, long timeout) {
-        // TODO
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        InternalExchange e = (InternalExchange) exchange;
+        Semaphore lock = e.getRole() == Role.Consumer ? e.getConsumerLock(true)
+                                                      : e.getProviderLock(true);
+        try {
+            dispatch(e);
+            if (timeout > 0) {
+                if (!lock.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+                    throw new TimeoutException();
+                }
+            } else {
+                lock.acquire();
+            }
+        } catch (InterruptedException ex) {
+            exchange.setError(ex);
+            exchange.setStatus(Status.Error);
+            return false;
+        } catch (TimeoutException ex) {
+            exchange.setError(ex);
+            exchange.setStatus(Status.Error);
+            return false;
+        }
+        return true;
     }
 
     /**
@@ -101,6 +133,13 @@
      * @param exchange the exchange to delivery
      */
     public void deliver(final InternalExchange exchange) {
+        // Handle case where the exchange has been sent synchronously
+        Semaphore lock = exchange.getRole() == Role.Provider ? exchange.getConsumerLock(false)
+                                                             : exchange.getProviderLock(false);
+        if (lock != null) {
+            lock.release();
+            return;
+        }
         // Delegate processing to the executor
         this.executor.execute(new Runnable() {
             public void run() {
@@ -116,11 +155,18 @@
      */
     protected void process(InternalExchange exchange) {
         // Set destination endpoint
-        exchange.setDestination(endpoint);
+        if (exchange.getDestination() == null) {
+            exchange.setDestination(endpoint);
+        }
         // Call listeners
         for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
             l.exchangeDelivered(exchange);
         }
+        // Change role
+        exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
+        // Check if sendSync was used, in which case we need to unblock the other side
+        // rather than delivering the exchange
+        // TODO:
         // Process exchange
         endpoint.process(exchange);
     }

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -42,6 +42,25 @@
     private Map<Endpoint, InternalEndpoint> endpoints = new ConcurrentHashMap<Endpoint, InternalEndpoint>();
     private ServiceRegistry<InternalEndpoint> registry = new ServiceRegistryImpl<InternalEndpoint>();
 
+    public EndpointRegistryImpl() {
+    }
+
+    public EndpointRegistryImpl(NMR nmr) {
+        this.nmr = nmr;
+    }
+
+    public NMR getNMR() {
+        return this.nmr;
+    }
+
+    public void setNMR(NMR nmr) {
+        this.nmr = nmr;
+    }
+
+    public void init() {
+        // TODO: check nmr
+    }
+
     /**
      * Register the given endpoint in the registry.
      * In an OSGi world, this would be performed automatically by a ServiceTracker.

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java Tue Aug 28 05:55:43 2007
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Semaphore;
 
 /**
  * @version $Revision: $
@@ -50,14 +51,18 @@
     private Exception error;
     private InternalEndpoint source;
     private InternalEndpoint destination;
+    private Semaphore consumerLock;
+    private Semaphore providerLock;
 
     /**
      * Creates and exchange of the given pattern
      * @param pattern the pattern of this exchange
      */
     public ExchangeImpl(Pattern pattern) {
-        this.pattern = pattern;
         this.id = UUID.randomUUID().toString();
+        this.status = Status.Active;
+        this.role = Role.Consumer;
+        this.pattern = pattern;
     }
     
     private ExchangeImpl() {
@@ -373,6 +378,20 @@
 
     public void setDestination(InternalEndpoint destination) {
         this.destination = destination;
+    }
+
+    public Semaphore getConsumerLock(boolean create) {
+        if (create) {
+            consumerLock = new Semaphore(0);
+        }
+        return consumerLock;
+    }
+
+    public Semaphore getProviderLock(boolean create) {
+        if (create) {
+            providerLock = new Semaphore(0);
+        }
+        return providerLock;
     }
 
 }

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -17,6 +17,7 @@
 package org.apache.servicemix.core;
 
 import org.apache.servicemix.api.ServiceMixException;
+import org.apache.servicemix.api.Role;
 import org.apache.servicemix.api.internal.*;
 
 /**
@@ -37,10 +38,31 @@
     }
 
     public void dispatch(InternalExchange exchange) {
-        InternalReference target = (InternalReference) exchange.getTarget();
-        for (InternalEndpoint endpoint : target.choose()) {
+        if (exchange.getRole() == Role.Consumer) {
+            if (exchange.getDestination() == null) {
+                InternalReference target = (InternalReference) exchange.getTarget();
+                for (InternalEndpoint endpoint : target.choose()) {
+                    for (Flow flow : getServices()) {
+                        if (flow.canDispatch(exchange, endpoint)) {
+                            exchange.setDestination(endpoint);
+                            flow.dispatch(exchange);
+                            return;
+                        }
+                    }
+                    throw new ServiceMixException("Could not dispatch exchange. No flow can handle it.");
+                }
+            } else {
+                for (Flow flow : getServices()) {
+                    if (flow.canDispatch(exchange, exchange.getDestination())) {
+                        flow.dispatch(exchange);
+                        return;
+                    }
+                }
+                throw new ServiceMixException("Could not dispatch exchange. No flow can handle it.");
+            }
+        } else {
             for (Flow flow : getServices()) {
-                if (flow.canDispatch(exchange, endpoint)) {
+                if (flow.canDispatch(exchange, exchange.getSource())) {
                     flow.dispatch(exchange);
                     return;
                 }

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -25,7 +25,7 @@
 /**
  *
  */
-public class    ListenerRegistryImpl extends ServiceRegistryImpl<Listener> implements ListenerRegistry {
+public class ListenerRegistryImpl extends ServiceRegistryImpl<Listener> implements ListenerRegistry {
 
     /**
      * Retrieve an iterator of listeners of a certain type

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java Tue Aug 28 05:55:43 2007
@@ -16,19 +16,17 @@
  */
 package org.apache.servicemix.core;
 
-import org.apache.servicemix.api.Reference;
-import org.apache.servicemix.api.Endpoint;
 import org.apache.servicemix.api.internal.InternalEndpoint;
+import org.apache.servicemix.api.internal.InternalReference;
 import org.w3c.dom.Document;
 
 import java.util.List;
-import java.util.Iterator;
 
 /**
  * @version $Revision: $
  * @since 4.0
  */
-public class ReferenceImpl implements Reference {
+public class ReferenceImpl implements InternalReference {
 
     private final List<InternalEndpoint> endpoints;
 
@@ -60,28 +58,8 @@
      *
      * @return an endpoint that will be used as the physical target
      */
-    public Iterator<Endpoint> choose() {
-        return new EndpointIterator(endpoints.iterator());
+    public Iterable<InternalEndpoint> choose() {
+        return endpoints;
     }
     
-    protected static class EndpointIterator implements Iterator<Endpoint> {
-
-        private final Iterator<InternalEndpoint> iterator;
-
-        public EndpointIterator(Iterator<InternalEndpoint> iterator) {
-            this.iterator = iterator;
-        }
-
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        public Endpoint next() {
-            return iterator.next();
-        }
-
-        public void remove() {
-            throw new IllegalStateException();
-        }
-    }
 }

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java Tue Aug 28 05:55:43 2007
@@ -23,6 +23,7 @@
 import org.apache.servicemix.api.internal.FlowRegistry;
 
 /**
+ * This class is the main class implementing the NMR
  *
  */
 public class ServiceMix implements NMR {
@@ -41,6 +42,15 @@
     }
 
     /**
+     * Set the endpoint registry
+     *
+     * @param endpoints the endpoint registry
+     */
+    public void setEndpointRegistry(EndpointRegistry endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    /**
      * Access the listener registry.
      *
      * @return the listener registry
@@ -50,12 +60,30 @@
     }
 
     /**
+     * Set the listener registry
+     *
+     * @param listeners the listener registry
+     */
+    public void setListenerRegistry(ListenerRegistry listeners) {
+        this.listeners = listeners;
+    }
+
+    /**
      * Access the flow registry.
      *
      * @return the flow registry
      */
     public FlowRegistry getFlowRegistry() {
         return flows;
+    }
+
+    /**
+     * Set the flow registry
+     *
+     * @param flows the flow registry
+     */
+    public void setFlowRegistry(FlowRegistry flows) {
+        this.flows = flows;
     }
 
     /**

Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java Tue Aug 28 05:55:43 2007
@@ -19,6 +19,7 @@
 import org.apache.servicemix.api.internal.Flow;
 import org.apache.servicemix.api.internal.InternalEndpoint;
 import org.apache.servicemix.api.internal.InternalExchange;
+import org.apache.servicemix.api.Role;
 
 
 /**
@@ -49,6 +50,8 @@
      * @param exchange the exchange to dispatch
      */
     public void dispatch(InternalExchange exchange) {
-        exchange.getDestination().getChannel().deliver(exchange);
+        InternalEndpoint endpoint = exchange.getRole() == Role.Consumer ? exchange.getDestination()
+                                                                        : exchange.getSource();
+        endpoint.getChannel().deliver(exchange);
     }
 }

Added: incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java?rev=570401&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java Tue Aug 28 05:55:43 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core;
+
+import org.apache.servicemix.api.*;
+import org.apache.servicemix.api.service.ServiceHelper;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Integration test
+ */
+public class IntegrationTest {
+
+    private NMR nmr;
+
+    @Before
+    public void setUp() {
+        ServiceMix smx = new ServiceMix();
+        smx.setListenerRegistry(new ListenerRegistryImpl());
+        smx.setEndpointRegistry(new EndpointRegistryImpl(smx));
+        smx.setFlowRegistry(new FlowRegistryImpl());
+        smx.getFlowRegistry().register(new StraightThroughFlow(), ServiceHelper.createMap());
+        nmr = smx;
+    }
+
+    @Test
+    public void testSendExchangeToEndpointUsingClient() throws Exception {
+        MyEndpoint endpoint = new MyEndpoint();
+        nmr.getEndpointRegistry().register(endpoint, ServiceHelper.createMap(Endpoint.ID, "id"));
+        Channel client = nmr.createChannel();
+        Exchange e = client.createExchange(Pattern.InOnly);
+        e.setTarget(nmr.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.ID, "id")));
+        e.getIn().setContent("Hello");
+        boolean res = client.sendSync(e);
+        assertTrue(res);
+        assertNotNull(endpoint.getExchange());
+        assertEquals(Status.Done, e.getStatus());
+    }
+
+
+    public static class MyEndpoint implements Endpoint {
+
+        private Channel channel;
+        private Exchange exchange;
+
+        public void setChannel(Channel channel) {
+            this.channel = channel;
+        }
+
+        public void process(Exchange exchange) {
+            this.exchange = exchange;
+            exchange.setStatus(Status.Done);
+            channel.send(exchange);
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+    }
+}

Modified: incubator/servicemix/branches/servicemix-4.0/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/pom.xml Tue Aug 28 05:55:43 2007
@@ -56,6 +56,7 @@
           <groupId>org.apache.felix</groupId>
           <artifactId>maven-bundle-plugin</artifactId>
   		  <version>1.0.0</version>
+          <extensions>true</extensions>
         </plugin>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>