You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/10/24 12:11:55 UTC
svn commit: r587850 - in /incubator/servicemix/branches/servicemix-4.0/jbi:
./ runtime/ runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/
runtime/src/test/java/org/apache/servicemix/jbi/runtime/
Author: gnodet
Date: Wed Oct 24 03:11:54 2007
New Revision: 587850
URL: http://svn.apache.org/viewvc?rev=587850&view=rev
Log:
The test is now fully functionnal
Added:
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java
Modified:
incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java
incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/pom.xml Wed Oct 24 03:11:54 2007
@@ -27,7 +27,7 @@
<version>4.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.servicemix</groupId>
+ <groupId>org.apache.servicemix.jbi</groupId>
<artifactId>jbi</artifactId>
<packaging>pom</packaging>
<version>4.0-SNAPSHOT</version>
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/pom.xml Wed Oct 24 03:11:54 2007
@@ -80,6 +80,7 @@
<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
<Export-Package>${pom.artifactId}</Export-Package>
<DynamicImport-Package>*</DynamicImport-Package>
+ <Spring-Context>*;publish-context:=false</Spring-Context>
</instructions>
</configuration>
</plugin>
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java Wed Oct 24 03:11:54 2007
@@ -61,16 +61,25 @@
private BlockingQueue<Exchange> queue;
private DeliveryChannel dc;
private List<EndpointImpl> endpoints;
- private Channel channel;
+ private EndpointImpl componentEndpoint;
public ComponentContextImpl(NMR nmr, Component component, Map<String,?> properties) {
+ if (properties == null) {
+ properties = new HashMap<String, Object>();
+ }
this.nmr = nmr;
this.component = component;
this.properties = properties;
- this.channel = nmr.createChannel();
- this.queue = new ArrayBlockingQueue<Exchange>(DEFAULT_QUEUE_CAPACITY);
- this.dc = new DeliveryChannelImpl(channel, queue);
this.endpoints = new ArrayList<EndpointImpl>();
+ this.queue = new ArrayBlockingQueue<Exchange>(DEFAULT_QUEUE_CAPACITY);
+ this.componentEndpoint = new EndpointImpl();
+ this.componentEndpoint.setQueue(queue);
+ this.nmr.getEndpointRegistry().register(componentEndpoint, properties);
+ this.dc = new DeliveryChannelImpl(this, componentEndpoint.getChannel(), queue);
+ }
+
+ public NMR getNmr() {
+ return nmr;
}
public synchronized ServiceEndpoint activateEndpoint(QName serviceName, String endpointName) throws JBIException {
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java Wed Oct 24 03:11:54 2007
@@ -16,6 +16,8 @@
*/
package org.apache.servicemix.jbi.runtime.impl;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,7 +30,11 @@
import javax.xml.namespace.QName;
import org.apache.servicemix.api.Channel;
+import org.apache.servicemix.api.Endpoint;
import org.apache.servicemix.api.Exchange;
+import org.apache.servicemix.api.Pattern;
+import org.apache.servicemix.api.Reference;
+import org.apache.servicemix.api.internal.InternalExchange;
/**
* Implementation of the DeliveryChannel.
@@ -39,13 +45,20 @@
/** Mutable boolean indicating if the channe has been closed */
private final AtomicBoolean closed;
+ /** The Component Context **/
+ private final ComponentContextImpl context;
+
/** Holds exchanges to be polled by the component */
private final BlockingQueue<Exchange> queue;
/** The underlying Channel */
private final Channel channel;
- public DeliveryChannelImpl(Channel channel, BlockingQueue<Exchange> queue) {
+ /** The default QName for endpoints not having this property */
+ private static final QName DEFAULT_SERVICE_NAME = new QName("urn:servicemix.apache.org", "jbi");
+
+ public DeliveryChannelImpl(ComponentContextImpl context, Channel channel, BlockingQueue<Exchange> queue) {
+ this.context = context;
this.channel = channel;
this.queue = queue;
this.closed = new AtomicBoolean(false);
@@ -85,7 +98,7 @@
if (exchange == null) {
return null;
}
- return new MessageExchangeImpl(exchange);
+ return getMessageExchange(exchange);
} catch (InterruptedException e) {
throw new MessagingException(e);
}
@@ -97,24 +110,90 @@
if (exchange == null) {
return null;
}
- return new MessageExchangeImpl(exchange);
+ return getMessageExchange(exchange);
} catch (InterruptedException e) {
throw new MessagingException(e);
}
}
+ protected MessageExchange getMessageExchange(Exchange exchange) {
+ MessageExchange me = exchange.getProperty(MessageExchange.class);
+ if (me == null) {
+ if (exchange.getPattern() == Pattern.InOnly) {
+ me = new InOnlyImpl(exchange);
+ } else if (exchange.getPattern() == Pattern.InOptionalOut) {
+ me = new InOptionalOutImpl(exchange);
+ } else if (exchange.getPattern() == Pattern.InOut) {
+ me = new InOutImpl(exchange);
+ } else if (exchange.getPattern() == Pattern.RobustInOnly) {
+ me = new RobustInOnlyImpl(exchange);
+ } else {
+ throw new IllegalStateException("Unkown pattern: " + exchange.getPattern());
+ }
+ exchange.setProperty(MessageExchange.class, me);
+ }
+ // Translate the destination endpoint
+ if (((InternalExchange) exchange).getDestination() != null && me.getEndpoint() == null) {
+ Endpoint ep = ((InternalExchange) exchange).getDestination();
+ Map<String, ?> props = context.getNmr().getEndpointRegistry().getProperties(ep);
+ QName serviceName = (QName) props.get(Endpoint.SERVICE_NAME);
+ if (serviceName == null) {
+ serviceName = DEFAULT_SERVICE_NAME;
+ }
+ String endpointName = (String) props.get(Endpoint.ENDPOINT_NAME);
+ if (endpointName == null) {
+ endpointName = (String) props.get(Endpoint.NAME);
+ }
+ me.setEndpoint(new ServiceEndpointImpl(serviceName, endpointName));
+ }
+ return me;
+ }
+
+ protected Exchange getExchange(MessageExchange messageExchange) {
+ // TODO
+ return null;
+ }
+
public void send(MessageExchange exchange) throws MessagingException {
assert exchange != null;
+ createTarget(exchange);
channel.send(((MessageExchangeImpl) exchange).getInternalExchange());
}
public boolean sendSync(MessageExchange exchange) throws MessagingException {
assert exchange != null;
+ createTarget(exchange);
return channel.sendSync(((MessageExchangeImpl) exchange).getInternalExchange());
}
public boolean sendSync(MessageExchange exchange, long timeout) throws MessagingException {
assert exchange != null;
+ createTarget(exchange);
return channel.sendSync(((MessageExchangeImpl) exchange).getInternalExchange(), timeout);
+ }
+
+ protected void createTarget(MessageExchange messageExchange) throws MessagingException {
+ Exchange exchange = ((MessageExchangeImpl) messageExchange).getInternalExchange();
+ if (exchange.getTarget() == null) {
+ Map<String, Object> props = new HashMap<String, Object>();
+ if (messageExchange.getEndpoint() != null) {
+ // TODO: handle explicit addressing
+ } else {
+ QName serviceName = messageExchange.getService();
+ if (serviceName != null) {
+ props.put(Endpoint.SERVICE_NAME, serviceName);
+ } else {
+ QName interfaceName = messageExchange.getInterfaceName();
+ if (interfaceName != null) {
+ props.put(Endpoint.INTERFACE_NAME, interfaceName);
+ }
+ }
+ }
+ if (props.isEmpty()) {
+ throw new MessagingException("No endpoint, service or interface name specified for routing");
+ }
+ Reference target = context.getNmr().getEndpointRegistry().lookup(props);
+ exchange.setTarget(target);
+ }
}
}
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/MessageExchangeImpl.java Wed Oct 24 03:11:54 2007
@@ -16,27 +16,37 @@
*/
package org.apache.servicemix.jbi.runtime.impl;
+import java.net.URI;
+import java.util.Set;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
import org.apache.servicemix.api.Exchange;
import org.apache.servicemix.api.Message;
-import org.apache.servicemix.api.Pattern;
import org.apache.servicemix.api.Status;
import org.apache.servicemix.core.MessageImpl;
-import javax.jbi.messaging.*;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.xml.namespace.QName;
-import java.net.URI;
-import java.util.Set;
-
/**
- * Created by IntelliJ IDEA.
- * User: gnodet
- * Date: Oct 5, 2007
- * Time: 2:21:25 PM
- * To change this template use File | Settings | File Templates.
+ * MessageExchange wrapper on top of an Exchange.
+ *
+ * @see Exchange
*/
public class MessageExchangeImpl implements MessageExchange {
+ public static final String INTERFACE_NAME_PROP = "javax.jbi.InterfaceName";
+ public static final String SERVICE_NAME_PROP = "javax.jbi.ServiceName";
+ public static final String SERVICE_ENDPOINT_PROP = "javax.jbi.ServiceEndpoint";
+
+ public static final String IN = "in";
+ public static final String OUT = "out";
+ public static final String FAULT = "fault";
+
private final Exchange exchange;
public MessageExchangeImpl(Exchange exchange) {
@@ -109,11 +119,11 @@
}
public NormalizedMessage getMessage(String name) {
- if ("in".equalsIgnoreCase(name)) {
+ if (IN.equalsIgnoreCase(name)) {
return getInMessage();
- } else if ("out".equalsIgnoreCase(name)) {
+ } else if (OUT.equalsIgnoreCase(name)) {
return getOutMessage();
- } else if ("fault".equalsIgnoreCase(name)) {
+ } else if (FAULT.equalsIgnoreCase(name)) {
return getFault();
} else {
throw new IllegalStateException();
@@ -121,11 +131,11 @@
}
public void setMessage(NormalizedMessage msg, String name) throws MessagingException {
- if ("in".equalsIgnoreCase(name)) {
+ if (IN.equalsIgnoreCase(name)) {
setInMessage(msg);
- } else if ("out".equalsIgnoreCase(name)) {
+ } else if (OUT.equalsIgnoreCase(name)) {
setOutMessage(msg);
- } else if ("fault".equalsIgnoreCase(name)) {
+ } else if (FAULT.equalsIgnoreCase(name)) {
setFault((Fault) msg);
} else {
throw new IllegalStateException();
@@ -187,27 +197,27 @@
}
public void setEndpoint(ServiceEndpoint endpoint) {
- exchange.setProperty("javax.jbi.ServiceEndpoint", endpoint);
+ exchange.setProperty(SERVICE_ENDPOINT_PROP, endpoint);
}
public void setService(QName service) {
- exchange.setProperty("javax.jbi.ServiceeName", service);
+ exchange.setProperty(SERVICE_NAME_PROP, service);
}
public void setInterfaceName(QName interfaceName) {
- exchange.setProperty("javax.jbi.InterfaceName", interfaceName);
+ exchange.setProperty(INTERFACE_NAME_PROP, interfaceName);
}
public ServiceEndpoint getEndpoint() {
- return exchange.getProperty("javax.jbi.ServiceEndpoint", ServiceEndpoint.class);
+ return exchange.getProperty(SERVICE_ENDPOINT_PROP, ServiceEndpoint.class);
}
public QName getInterfaceName() {
- return exchange.getProperty("javax.jbi.InterfaceName", QName.class);
+ return exchange.getProperty(INTERFACE_NAME_PROP, QName.class);
}
public QName getService() {
- return exchange.getProperty("javax.jbi.ServiceName", QName.class);
+ return exchange.getProperty(SERVICE_NAME_PROP, QName.class);
}
public boolean isTransacted() {
Added: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java?rev=587850&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ServiceEndpointImpl.java Wed Oct 24 03:11:54 2007
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.runtime.impl;
+
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.DocumentFragment;
+
+/**
+ * A basic implementation of ServiceEndpoint
+ */
+public class ServiceEndpointImpl implements ServiceEndpoint {
+
+ private final QName serviceName;
+ private final String endpointName;
+
+ public ServiceEndpointImpl(QName serviceName, String endpointName) {
+ this.serviceName = serviceName;
+ this.endpointName = endpointName;
+ }
+
+ public DocumentFragment getAsReference(QName operationName) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public QName[] getInterfaces() {
+ return new QName[0]; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public QName getServiceName() {
+ return serviceName;
+ }
+}
Modified: incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java?rev=587850&r1=587849&r2=587850&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java Wed Oct 24 03:11:54 2007
@@ -16,10 +16,16 @@
*/
package org.apache.servicemix.jbi.runtime;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
import org.apache.servicemix.api.Channel;
+import org.apache.servicemix.api.Endpoint;
import org.apache.servicemix.api.Exchange;
import org.apache.servicemix.api.Pattern;
-import org.apache.servicemix.api.Endpoint;
+import org.apache.servicemix.api.Status;
import org.apache.servicemix.api.service.ServiceHelper;
import org.apache.servicemix.core.ServiceMix;
import org.apache.servicemix.eip.EIPComponent;
@@ -29,8 +35,6 @@
import org.apache.servicemix.jbi.runtime.impl.ComponentRegistryImpl;
import org.junit.Test;
-import javax.xml.namespace.QName;
-
/**
* Created by IntelliJ IDEA.
* User: gnodet
@@ -46,6 +50,23 @@
smx.init();
ComponentRegistryImpl reg = new ComponentRegistryImpl();
reg.setNmr(smx);
+
+ Endpoint tep = new Endpoint() {
+ private Channel channel;
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+ public void process(Exchange exchange) {
+ System.out.println("Received exchange: " + exchange);
+ if (exchange.getStatus() == Status.Active) {
+ exchange.setStatus(Status.Done);
+ channel.send(exchange);
+ }
+ }
+ };
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put(Endpoint.SERVICE_NAME, new QName("target"));
+ smx.getEndpointRegistry().register(tep, props);
EIPComponent eip = new EIPComponent();
WireTap ep = new WireTap();