You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/10/24 12:11:55 UTC

svn commit: r587850 - in /incubator/servicemix/branches/servicemix-4.0/jbi: ./ runtime/ runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ runtime/src/test/java/org/apache/servicemix/jbi/runtime/

Author: gnodet
Date: Wed Oct 24 03:11:54 2007
New Revision: 587850

URL: http://svn.apache.org/viewvc?rev=587850&view=rev
Log:
The test is now fully functionnal

Added:
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java
Modified:
    incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java
    incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml Wed Oct 24 03:11:54 2007
@@ -27,7 +27,7 @@
     <version>4.0-SNAPSHOT</version>
   </parent>
 
-  <groupId>org.apache.servicemix</groupId>
+  <groupId>org.apache.servicemix.jbi</groupId>
   <artifactId>jbi</artifactId>
   <packaging>pom</packaging>
   <version>4.0-SNAPSHOT</version>

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml Wed Oct 24 03:11:54 2007
@@ -80,6 +80,7 @@
             <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
             <Export-Package>${pom.artifactId}</Export-Package>
             <DynamicImport-Package>*</DynamicImport-Package>
+            <Spring-Context>*;publish-context:=false</Spring-Context>
           </instructions>
         </configuration>
       </plugin>

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java Wed Oct 24 03:11:54 2007
@@ -61,16 +61,25 @@
     private BlockingQueue<Exchange> queue;
     private DeliveryChannel dc;
     private List<EndpointImpl> endpoints;
-    private Channel channel;
+    private EndpointImpl componentEndpoint;
 
     public ComponentContextImpl(NMR nmr, Component component, Map<String,?> properties) {
+        if (properties == null) {
+            properties = new HashMap<String, Object>();
+        }
         this.nmr = nmr;
         this.component = component;
         this.properties = properties;
-        this.channel = nmr.createChannel();
-        this.queue = new ArrayBlockingQueue<Exchange>(DEFAULT_QUEUE_CAPACITY);
-        this.dc = new DeliveryChannelImpl(channel, queue);
         this.endpoints = new ArrayList<EndpointImpl>();
+        this.queue = new ArrayBlockingQueue<Exchange>(DEFAULT_QUEUE_CAPACITY);
+        this.componentEndpoint = new EndpointImpl();
+        this.componentEndpoint.setQueue(queue);
+        this.nmr.getEndpointRegistry().register(componentEndpoint, properties);
+        this.dc = new DeliveryChannelImpl(this, componentEndpoint.getChannel(), queue);
+    }
+
+    public NMR getNmr() {
+        return nmr;
     }
 
     public synchronized ServiceEndpoint activateEndpoint(QName serviceName, String endpointName) throws JBIException {

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java Wed Oct 24 03:11:54 2007
@@ -16,6 +16,8 @@
  */
 package org.apache.servicemix.jbi.runtime.impl;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,7 +30,11 @@
 import javax.xml.namespace.QName;
 
 import org.apache.servicemix.api.Channel;
+import org.apache.servicemix.api.Endpoint;
 import org.apache.servicemix.api.Exchange;
+import org.apache.servicemix.api.Pattern;
+import org.apache.servicemix.api.Reference;
+import org.apache.servicemix.api.internal.InternalExchange;
 
 /**
  * Implementation of the DeliveryChannel.
@@ -39,13 +45,20 @@
     /** Mutable boolean indicating if the channe has been closed */
     private final AtomicBoolean closed;
 
+    /** The Component Context **/
+    private final ComponentContextImpl context;
+
     /** Holds exchanges to be polled by the component */
     private final BlockingQueue<Exchange> queue;
 
     /** The underlying Channel */
     private final Channel channel;
 
-    public DeliveryChannelImpl(Channel channel, BlockingQueue<Exchange> queue) {
+    /** The default QName for endpoints not having this property */
+    private static final QName DEFAULT_SERVICE_NAME = new QName("urn:servicemix.apache.org", "jbi");
+
+    public DeliveryChannelImpl(ComponentContextImpl context, Channel channel, BlockingQueue<Exchange> queue) {
+        this.context = context;
         this.channel = channel;
         this.queue = queue;
         this.closed = new AtomicBoolean(false);
@@ -85,7 +98,7 @@
             if (exchange == null) {
                 return null;
             }
-            return new MessageExchangeImpl(exchange);
+            return getMessageExchange(exchange);
         } catch (InterruptedException e) {
             throw new MessagingException(e);
         }
@@ -97,24 +110,90 @@
             if (exchange == null) {
                 return null;
             }
-            return new MessageExchangeImpl(exchange);
+            return getMessageExchange(exchange);
         } catch (InterruptedException e) {
             throw new MessagingException(e);
         }
     }
 
+    protected MessageExchange getMessageExchange(Exchange exchange) {
+        MessageExchange me = exchange.getProperty(MessageExchange.class);
+        if (me == null) {
+            if (exchange.getPattern() == Pattern.InOnly) {
+                me = new InOnlyImpl(exchange);
+            } else if (exchange.getPattern() == Pattern.InOptionalOut) {
+                me = new InOptionalOutImpl(exchange);
+            } else if (exchange.getPattern() == Pattern.InOut) {
+                me = new InOutImpl(exchange);
+            } else if (exchange.getPattern() == Pattern.RobustInOnly) {
+                me = new RobustInOnlyImpl(exchange);
+            } else {
+                throw new IllegalStateException("Unkown pattern: " + exchange.getPattern());
+            }
+            exchange.setProperty(MessageExchange.class, me);
+        }
+        // Translate the destination endpoint
+        if (((InternalExchange) exchange).getDestination() != null && me.getEndpoint() == null) {
+            Endpoint ep = ((InternalExchange) exchange).getDestination();
+            Map<String, ?> props = context.getNmr().getEndpointRegistry().getProperties(ep);
+            QName serviceName = (QName) props.get(Endpoint.SERVICE_NAME);
+            if (serviceName == null) {
+                serviceName = DEFAULT_SERVICE_NAME;
+            }
+            String endpointName = (String) props.get(Endpoint.ENDPOINT_NAME);
+            if (endpointName == null) {
+                endpointName = (String) props.get(Endpoint.NAME);
+            }
+            me.setEndpoint(new ServiceEndpointImpl(serviceName, endpointName));
+        }
+        return me;
+    }
+
+    protected Exchange getExchange(MessageExchange messageExchange) {
+        // TODO
+        return null;
+    }
+
     public void send(MessageExchange exchange) throws MessagingException {
         assert exchange != null;
+        createTarget(exchange);
         channel.send(((MessageExchangeImpl) exchange).getInternalExchange());
     }
 
     public boolean sendSync(MessageExchange exchange) throws MessagingException {
         assert exchange != null;
+        createTarget(exchange);
         return channel.sendSync(((MessageExchangeImpl) exchange).getInternalExchange());
     }
 
     public boolean sendSync(MessageExchange exchange, long timeout) throws MessagingException {
         assert exchange != null;
+        createTarget(exchange);
         return channel.sendSync(((MessageExchangeImpl) exchange).getInternalExchange(), timeout);
+    }
+
+    protected void createTarget(MessageExchange messageExchange) throws MessagingException {
+        Exchange exchange = ((MessageExchangeImpl) messageExchange).getInternalExchange();
+        if (exchange.getTarget() == null) {
+            Map<String, Object> props = new HashMap<String, Object>();
+            if (messageExchange.getEndpoint() != null) {
+                // TODO: handle explicit addressing
+            } else {
+                QName serviceName = messageExchange.getService();
+                if (serviceName != null) {
+                    props.put(Endpoint.SERVICE_NAME, serviceName);
+                } else {
+                    QName interfaceName = messageExchange.getInterfaceName();
+                    if (interfaceName != null) {
+                        props.put(Endpoint.INTERFACE_NAME, interfaceName);
+                    }
+                }
+            }
+            if (props.isEmpty()) {
+                throw new MessagingException("No endpoint, service or interface name specified for routing");
+            }
+            Reference target = context.getNmr().getEndpointRegistry().lookup(props);
+            exchange.setTarget(target);
+        }
     }
 }

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java Wed Oct 24 03:11:54 2007
@@ -16,27 +16,37 @@
  */
 package org.apache.servicemix.jbi.runtime.impl;
 
+import java.net.URI;
+import java.util.Set;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
 import org.apache.servicemix.api.Exchange;
 import org.apache.servicemix.api.Message;
-import org.apache.servicemix.api.Pattern;
 import org.apache.servicemix.api.Status;
 import org.apache.servicemix.core.MessageImpl;
 
-import javax.jbi.messaging.*;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.xml.namespace.QName;
-import java.net.URI;
-import java.util.Set;
-
 /**
- * Created by IntelliJ IDEA.
- * User: gnodet
- * Date: Oct 5, 2007
- * Time: 2:21:25 PM
- * To change this template use File | Settings | File Templates.
+ * MessageExchange wrapper on top of an Exchange.
+ *
+ * @see Exchange
  */
 public class MessageExchangeImpl implements MessageExchange  {
 
+    public static final String INTERFACE_NAME_PROP = "javax.jbi.InterfaceName";
+    public static final String SERVICE_NAME_PROP = "javax.jbi.ServiceName";
+    public static final String SERVICE_ENDPOINT_PROP = "javax.jbi.ServiceEndpoint";
+
+    public static final String IN = "in";
+    public static final String OUT = "out";
+    public static final String FAULT = "fault";
+
     private final Exchange exchange;
 
     public MessageExchangeImpl(Exchange exchange) {
@@ -109,11 +119,11 @@
     }
 
     public NormalizedMessage getMessage(String name) {
-        if ("in".equalsIgnoreCase(name)) {
+        if (IN.equalsIgnoreCase(name)) {
             return getInMessage();
-        } else if ("out".equalsIgnoreCase(name)) {
+        } else if (OUT.equalsIgnoreCase(name)) {
             return getOutMessage();
-        } else if ("fault".equalsIgnoreCase(name)) {
+        } else if (FAULT.equalsIgnoreCase(name)) {
             return getFault();
         } else {
             throw new IllegalStateException();
@@ -121,11 +131,11 @@
     }
 
     public void setMessage(NormalizedMessage msg, String name) throws MessagingException {
-        if ("in".equalsIgnoreCase(name)) {
+        if (IN.equalsIgnoreCase(name)) {
             setInMessage(msg);
-        } else if ("out".equalsIgnoreCase(name)) {
+        } else if (OUT.equalsIgnoreCase(name)) {
             setOutMessage(msg);
-        } else if ("fault".equalsIgnoreCase(name)) {
+        } else if (FAULT.equalsIgnoreCase(name)) {
             setFault((Fault) msg);
         } else {
             throw new IllegalStateException();
@@ -187,27 +197,27 @@
     }
 
     public void setEndpoint(ServiceEndpoint endpoint) {
-        exchange.setProperty("javax.jbi.ServiceEndpoint", endpoint);
+        exchange.setProperty(SERVICE_ENDPOINT_PROP, endpoint);
     }
 
     public void setService(QName service) {
-        exchange.setProperty("javax.jbi.ServiceeName", service);
+        exchange.setProperty(SERVICE_NAME_PROP, service);
     }
 
     public void setInterfaceName(QName interfaceName) {
-        exchange.setProperty("javax.jbi.InterfaceName", interfaceName);
+        exchange.setProperty(INTERFACE_NAME_PROP, interfaceName);
     }
 
     public ServiceEndpoint getEndpoint() {
-        return exchange.getProperty("javax.jbi.ServiceEndpoint", ServiceEndpoint.class);
+        return exchange.getProperty(SERVICE_ENDPOINT_PROP, ServiceEndpoint.class);
     }
 
     public QName getInterfaceName() {
-        return exchange.getProperty("javax.jbi.InterfaceName", QName.class);
+        return exchange.getProperty(INTERFACE_NAME_PROP, QName.class);
     }
 
     public QName getService() {
-        return exchange.getProperty("javax.jbi.ServiceName", QName.class);
+        return exchange.getProperty(SERVICE_NAME_PROP, QName.class);
     }
 
     public boolean isTransacted() {

Added: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java?rev=587850&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java Wed Oct 24 03:11:54 2007
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.runtime.impl;
+
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.DocumentFragment;
+
+/**
+ * A basic implementation of ServiceEndpoint
+ */
+public class ServiceEndpointImpl implements ServiceEndpoint {
+
+    private final QName serviceName;
+    private final String endpointName;
+
+    public ServiceEndpointImpl(QName serviceName, String endpointName) {
+        this.serviceName = serviceName;
+        this.endpointName = endpointName;
+    }
+
+    public DocumentFragment getAsReference(QName operationName) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public String getEndpointName() {
+        return endpointName;
+    }
+
+    public QName[] getInterfaces() {
+        return new QName[0];  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public QName getServiceName() {
+        return serviceName;
+    }
+}

Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java Wed Oct 24 03:11:54 2007
@@ -16,10 +16,16 @@
  */
 package org.apache.servicemix.jbi.runtime;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
 import org.apache.servicemix.api.Channel;
+import org.apache.servicemix.api.Endpoint;
 import org.apache.servicemix.api.Exchange;
 import org.apache.servicemix.api.Pattern;
-import org.apache.servicemix.api.Endpoint;
+import org.apache.servicemix.api.Status;
 import org.apache.servicemix.api.service.ServiceHelper;
 import org.apache.servicemix.core.ServiceMix;
 import org.apache.servicemix.eip.EIPComponent;
@@ -29,8 +35,6 @@
 import org.apache.servicemix.jbi.runtime.impl.ComponentRegistryImpl;
 import org.junit.Test;
 
-import javax.xml.namespace.QName;
-
 /**
  * Created by IntelliJ IDEA.
  * User: gnodet
@@ -46,6 +50,23 @@
         smx.init();
         ComponentRegistryImpl reg = new ComponentRegistryImpl();
         reg.setNmr(smx);
+
+        Endpoint tep = new Endpoint() {
+            private Channel channel;
+            public void setChannel(Channel channel) {
+                this.channel = channel;
+            }
+            public void process(Exchange exchange) {
+                System.out.println("Received exchange: " + exchange);
+                if (exchange.getStatus() == Status.Active) {
+                    exchange.setStatus(Status.Done);
+                    channel.send(exchange);
+                }
+            }
+        };
+        Map<String, Object> props = new HashMap<String, Object>();
+        props.put(Endpoint.SERVICE_NAME, new QName("target"));
+        smx.getEndpointRegistry().register(tep, props);
 
         EIPComponent eip = new EIPComponent();
         WireTap ep = new WireTap();