You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/28 14:55:45 UTC
svn commit: r570401 - in /incubator/servicemix/branches/servicemix-4.0: ./
api/ api/src/main/java/org/apache/servicemix/api/
api/src/main/java/org/apache/servicemix/api/internal/
api/src/main/java/org/apache/servicemix/api/service/ core/ core/src/main/...
Author: gnodet
Date: Tue Aug 28 05:55:43 2007
New Revision: 570401
URL: http://svn.apache.org/viewvc?rev=570401&view=rev
Log:
Implement a simple channel and add an test end to end showing how to expose an endpoint and send an exchange to it
Added:
incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java
incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java
Modified:
incubator/servicemix/branches/servicemix-4.0/ (props changed)
incubator/servicemix/branches/servicemix-4.0/api/ (props changed)
incubator/servicemix/branches/servicemix-4.0/api/pom.xml
incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java
incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java
incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java
incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java
incubator/servicemix/branches/servicemix-4.0/core/ (props changed)
incubator/servicemix/branches/servicemix-4.0/core/pom.xml
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java
incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java
incubator/servicemix/branches/servicemix-4.0/pom.xml
Propchange: incubator/servicemix/branches/servicemix-4.0/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+
Propchange: incubator/servicemix/branches/servicemix-4.0/api/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+
Modified: incubator/servicemix/branches/servicemix-4.0/api/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/pom.xml Tue Aug 28 05:55:43 2007
@@ -29,7 +29,7 @@
<groupId>org.apache.servicemix</groupId>
<artifactId>org.apache.servicemix.api</artifactId>
- <packaging>jar</packaging>
+ <packaging>bundle</packaging>
<version>4.0-SNAPSHOT</version>
<name>org.apache.servicemix.api</name>
@@ -55,8 +55,9 @@
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
+ <Bundle-Name>${pom.name}</Bundle-Name>
<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
- <Export-Package>${pom.artifactId}</Export-Package>
+ <Export-Package>${pom.artifactId},${pom.artifactId}.service,${pom.artifactId}.event,${pom.artifactId}.internal</Export-Package>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/Channel.java Tue Aug 28 05:55:43 2007
@@ -31,6 +31,13 @@
public interface Channel {
/**
+ * Access to the bus
+ *
+ * @return the NMR
+ */
+ NMR getNMR();
+
+ /**
* Creates a new exchange.
*
* @param pattern specify the InOnly / InOut / RobustInOnly / RobustInOut
Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/NMR.java Tue Aug 28 05:55:43 2007
@@ -20,7 +20,12 @@
import org.apache.servicemix.api.event.ListenerRegistry;
/**
- *
+ * The NMR interface is the primary interface to communicate with the Bus.
+ * It contains methods to access the registries and to create a client channel
+ * to communicate with the bus.
+ *
+ * @version $Revision: $
+ * @since 4.0
*/
public interface NMR {
Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalEndpoint.java Tue Aug 28 05:55:43 2007
@@ -24,6 +24,12 @@
*/
public interface InternalEndpoint extends Endpoint {
+ /**
+ * Retrieve the channel associated with this endpoint.
+ * This method is usually used by {@link Flow}s to deliver
+ * exchanges to this endpoint.
+ * @return
+ */
InternalChannel getChannel();
}
Modified: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/internal/InternalExchange.java Tue Aug 28 05:55:43 2007
@@ -17,6 +17,9 @@
package org.apache.servicemix.api.internal;
import org.apache.servicemix.api.Exchange;
+import org.apache.servicemix.api.Role;
+
+import java.util.concurrent.Semaphore;
/**
*
@@ -27,6 +30,15 @@
public interface InternalExchange extends Exchange {
/**
+ * Set the role of the exchange.
+ * This method is for internal use and will be called by the Channel
+ * when delivering the exchange.
+ *
+ * @param role the new role
+ */
+ public void setRole(Role role);
+
+ /**
* Retrieve the source endpoint. I.e. the one that created the exchange.
* This information will be set by the NMR when the exchange is sent
* using one of {@link org.apache.servicemix.api.Channel#send(Exchange)},
@@ -61,4 +73,8 @@
* @param destination the destination endpoint
*/
void setDestination(InternalEndpoint destination);
+
+ Semaphore getConsumerLock(boolean create);
+
+ Semaphore getProviderLock(boolean create);
}
Added: incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java?rev=570401&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/api/src/main/java/org/apache/servicemix/api/service/ServiceHelper.java Tue Aug 28 05:55:43 2007
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.api.service;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ *
+ */
+public final class ServiceHelper {
+
+ private ServiceHelper() {
+ }
+
+ public static Map<String, ?> createMap(String... data) {
+ Map<String, String> props = new HashMap<String, String>();
+ for (int i = 0; i < data.length / 2; i++) {
+ props.put(data[i*2], data[i*2+1]);
+ }
+ return props;
+ }
+
+}
Propchange: incubator/servicemix/branches/servicemix-4.0/core/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Aug 28 05:55:43 2007
@@ -0,0 +1,7 @@
+target
+.classpath
+.project
+*.iml
+*.ipr
+*.iws
+
Modified: incubator/servicemix/branches/servicemix-4.0/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/pom.xml Tue Aug 28 05:55:43 2007
@@ -29,7 +29,7 @@
<groupId>org.apache.servicemix</groupId>
<artifactId>org.apache.servicemix.core</artifactId>
- <packaging>jar</packaging>
+ <packaging>bundle</packaging>
<version>4.0-SNAPSHOT</version>
<name>org.apache.servicemix.core</name>
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ChannelImpl.java Tue Aug 28 05:55:43 2007
@@ -16,17 +16,22 @@
*/
package org.apache.servicemix.core;
-import org.apache.servicemix.api.Exchange;
-import org.apache.servicemix.api.NMR;
-import org.apache.servicemix.api.Pattern;
+import org.apache.servicemix.api.*;
import org.apache.servicemix.api.event.ExchangeListener;
import org.apache.servicemix.api.internal.InternalChannel;
import org.apache.servicemix.api.internal.InternalEndpoint;
import org.apache.servicemix.api.internal.InternalExchange;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
+ * The Channel implementation.
+ * The channel uses an Executor (usually a thread pool)
+ * to delegate
+ *
* @version $Revision: $
* @since 4.0
*/
@@ -35,7 +40,6 @@
private final InternalEndpoint endpoint;
private final Executor executor;
private final NMR nmr;
-
public ChannelImpl(InternalEndpoint endpoint, Executor executor, NMR nmr) {
this.endpoint = endpoint;
@@ -44,6 +48,15 @@
}
/**
+ * Access to the bus
+ *
+ * @return the NMR
+ */
+ public NMR getNMR() {
+ return nmr;
+ }
+
+ /**
* Creates a new exchange.
*
* @param pattern specify the InOnly / InOut / RobustInOnly / RobustInOut
@@ -70,8 +83,7 @@
* @return <code>true</code> if the exchange has been processed succesfully
*/
public boolean sendSync(Exchange exchange) {
- // TODO
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return sendSync(exchange, 0);
}
/**
@@ -82,8 +94,28 @@
* @return <code>true</code> if the exchange has been processed succesfully
*/
public boolean sendSync(Exchange exchange, long timeout) {
- // TODO
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ InternalExchange e = (InternalExchange) exchange;
+ Semaphore lock = e.getRole() == Role.Consumer ? e.getConsumerLock(true)
+ : e.getProviderLock(true);
+ try {
+ dispatch(e);
+ if (timeout > 0) {
+ if (!lock.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
+ throw new TimeoutException();
+ }
+ } else {
+ lock.acquire();
+ }
+ } catch (InterruptedException ex) {
+ exchange.setError(ex);
+ exchange.setStatus(Status.Error);
+ return false;
+ } catch (TimeoutException ex) {
+ exchange.setError(ex);
+ exchange.setStatus(Status.Error);
+ return false;
+ }
+ return true;
}
/**
@@ -101,6 +133,13 @@
* @param exchange the exchange to delivery
*/
public void deliver(final InternalExchange exchange) {
+ // Handle case where the exchange has been sent synchronously
+ Semaphore lock = exchange.getRole() == Role.Provider ? exchange.getConsumerLock(false)
+ : exchange.getProviderLock(false);
+ if (lock != null) {
+ lock.release();
+ return;
+ }
// Delegate processing to the executor
this.executor.execute(new Runnable() {
public void run() {
@@ -116,11 +155,18 @@
*/
protected void process(InternalExchange exchange) {
// Set destination endpoint
- exchange.setDestination(endpoint);
+ if (exchange.getDestination() == null) {
+ exchange.setDestination(endpoint);
+ }
// Call listeners
for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
l.exchangeDelivered(exchange);
}
+ // Change role
+ exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
+ // Check if sendSync was used, in which case we need to unblock the other side
+ // rather than delivering the exchange
+ // TODO:
// Process exchange
endpoint.process(exchange);
}
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/EndpointRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -42,6 +42,25 @@
private Map<Endpoint, InternalEndpoint> endpoints = new ConcurrentHashMap<Endpoint, InternalEndpoint>();
private ServiceRegistry<InternalEndpoint> registry = new ServiceRegistryImpl<InternalEndpoint>();
+ public EndpointRegistryImpl() {
+ }
+
+ public EndpointRegistryImpl(NMR nmr) {
+ this.nmr = nmr;
+ }
+
+ public NMR getNMR() {
+ return this.nmr;
+ }
+
+ public void setNMR(NMR nmr) {
+ this.nmr = nmr;
+ }
+
+ public void init() {
+ // TODO: check nmr
+ }
+
/**
* Register the given endpoint in the registry.
* In an OSGi world, this would be performed automatically by a ServiceTracker.
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ExchangeImpl.java Tue Aug 28 05:55:43 2007
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Semaphore;
/**
* @version $Revision: $
@@ -50,14 +51,18 @@
private Exception error;
private InternalEndpoint source;
private InternalEndpoint destination;
+ private Semaphore consumerLock;
+ private Semaphore providerLock;
/**
* Creates and exchange of the given pattern
* @param pattern the pattern of this exchange
*/
public ExchangeImpl(Pattern pattern) {
- this.pattern = pattern;
this.id = UUID.randomUUID().toString();
+ this.status = Status.Active;
+ this.role = Role.Consumer;
+ this.pattern = pattern;
}
private ExchangeImpl() {
@@ -373,6 +378,20 @@
public void setDestination(InternalEndpoint destination) {
this.destination = destination;
+ }
+
+ public Semaphore getConsumerLock(boolean create) {
+ if (create) {
+ consumerLock = new Semaphore(0);
+ }
+ return consumerLock;
+ }
+
+ public Semaphore getProviderLock(boolean create) {
+ if (create) {
+ providerLock = new Semaphore(0);
+ }
+ return providerLock;
}
}
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/FlowRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -17,6 +17,7 @@
package org.apache.servicemix.core;
import org.apache.servicemix.api.ServiceMixException;
+import org.apache.servicemix.api.Role;
import org.apache.servicemix.api.internal.*;
/**
@@ -37,10 +38,31 @@
}
public void dispatch(InternalExchange exchange) {
- InternalReference target = (InternalReference) exchange.getTarget();
- for (InternalEndpoint endpoint : target.choose()) {
+ if (exchange.getRole() == Role.Consumer) {
+ if (exchange.getDestination() == null) {
+ InternalReference target = (InternalReference) exchange.getTarget();
+ for (InternalEndpoint endpoint : target.choose()) {
+ for (Flow flow : getServices()) {
+ if (flow.canDispatch(exchange, endpoint)) {
+ exchange.setDestination(endpoint);
+ flow.dispatch(exchange);
+ return;
+ }
+ }
+ throw new ServiceMixException("Could not dispatch exchange. No flow can handle it.");
+ }
+ } else {
+ for (Flow flow : getServices()) {
+ if (flow.canDispatch(exchange, exchange.getDestination())) {
+ flow.dispatch(exchange);
+ return;
+ }
+ }
+ throw new ServiceMixException("Could not dispatch exchange. No flow can handle it.");
+ }
+ } else {
for (Flow flow : getServices()) {
- if (flow.canDispatch(exchange, endpoint)) {
+ if (flow.canDispatch(exchange, exchange.getSource())) {
flow.dispatch(exchange);
return;
}
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ListenerRegistryImpl.java Tue Aug 28 05:55:43 2007
@@ -25,7 +25,7 @@
/**
*
*/
-public class ListenerRegistryImpl extends ServiceRegistryImpl<Listener> implements ListenerRegistry {
+public class ListenerRegistryImpl extends ServiceRegistryImpl<Listener> implements ListenerRegistry {
/**
* Retrieve an iterator of listeners of a certain type
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ReferenceImpl.java Tue Aug 28 05:55:43 2007
@@ -16,19 +16,17 @@
*/
package org.apache.servicemix.core;
-import org.apache.servicemix.api.Reference;
-import org.apache.servicemix.api.Endpoint;
import org.apache.servicemix.api.internal.InternalEndpoint;
+import org.apache.servicemix.api.internal.InternalReference;
import org.w3c.dom.Document;
import java.util.List;
-import java.util.Iterator;
/**
* @version $Revision: $
* @since 4.0
*/
-public class ReferenceImpl implements Reference {
+public class ReferenceImpl implements InternalReference {
private final List<InternalEndpoint> endpoints;
@@ -60,28 +58,8 @@
*
* @return an endpoint that will be used as the physical target
*/
- public Iterator<Endpoint> choose() {
- return new EndpointIterator(endpoints.iterator());
+ public Iterable<InternalEndpoint> choose() {
+ return endpoints;
}
- protected static class EndpointIterator implements Iterator<Endpoint> {
-
- private final Iterator<InternalEndpoint> iterator;
-
- public EndpointIterator(Iterator<InternalEndpoint> iterator) {
- this.iterator = iterator;
- }
-
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- public Endpoint next() {
- return iterator.next();
- }
-
- public void remove() {
- throw new IllegalStateException();
- }
- }
}
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/ServiceMix.java Tue Aug 28 05:55:43 2007
@@ -23,6 +23,7 @@
import org.apache.servicemix.api.internal.FlowRegistry;
/**
+ * This class is the main class implementing the NMR
*
*/
public class ServiceMix implements NMR {
@@ -41,6 +42,15 @@
}
/**
+ * Set the endpoint registry
+ *
+ * @param endpoints the endpoint registry
+ */
+ public void setEndpointRegistry(EndpointRegistry endpoints) {
+ this.endpoints = endpoints;
+ }
+
+ /**
* Access the listener registry.
*
* @return the listener registry
@@ -50,12 +60,30 @@
}
/**
+ * Set the listener registry
+ *
+ * @param listeners the listener registry
+ */
+ public void setListenerRegistry(ListenerRegistry listeners) {
+ this.listeners = listeners;
+ }
+
+ /**
* Access the flow registry.
*
* @return the flow registry
*/
public FlowRegistry getFlowRegistry() {
return flows;
+ }
+
+ /**
+ * Set the flow registry
+ *
+ * @param flows the flow registry
+ */
+ public void setFlowRegistry(FlowRegistry flows) {
+ this.flows = flows;
}
/**
Modified: incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java (original)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/main/java/org/apache/servicemix/core/StraightThroughFlow.java Tue Aug 28 05:55:43 2007
@@ -19,6 +19,7 @@
import org.apache.servicemix.api.internal.Flow;
import org.apache.servicemix.api.internal.InternalEndpoint;
import org.apache.servicemix.api.internal.InternalExchange;
+import org.apache.servicemix.api.Role;
/**
@@ -49,6 +50,8 @@
* @param exchange the exchange to dispatch
*/
public void dispatch(InternalExchange exchange) {
- exchange.getDestination().getChannel().deliver(exchange);
+ InternalEndpoint endpoint = exchange.getRole() == Role.Consumer ? exchange.getDestination()
+ : exchange.getSource();
+ endpoint.getChannel().deliver(exchange);
}
}
Added: incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java?rev=570401&view=auto
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java (added)
+++ incubator/servicemix/branches/servicemix-4.0/core/src/test/java/org/apache/servicemix/core/IntegrationTest.java Tue Aug 28 05:55:43 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core;
+
+import org.apache.servicemix.api.*;
+import org.apache.servicemix.api.service.ServiceHelper;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Integration test
+ */
+public class IntegrationTest {
+
+ private NMR nmr;
+
+ @Before
+ public void setUp() {
+ ServiceMix smx = new ServiceMix();
+ smx.setListenerRegistry(new ListenerRegistryImpl());
+ smx.setEndpointRegistry(new EndpointRegistryImpl(smx));
+ smx.setFlowRegistry(new FlowRegistryImpl());
+ smx.getFlowRegistry().register(new StraightThroughFlow(), ServiceHelper.createMap());
+ nmr = smx;
+ }
+
+ @Test
+ public void testSendExchangeToEndpointUsingClient() throws Exception {
+ MyEndpoint endpoint = new MyEndpoint();
+ nmr.getEndpointRegistry().register(endpoint, ServiceHelper.createMap(Endpoint.ID, "id"));
+ Channel client = nmr.createChannel();
+ Exchange e = client.createExchange(Pattern.InOnly);
+ e.setTarget(nmr.getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.ID, "id")));
+ e.getIn().setContent("Hello");
+ boolean res = client.sendSync(e);
+ assertTrue(res);
+ assertNotNull(endpoint.getExchange());
+ assertEquals(Status.Done, e.getStatus());
+ }
+
+
+ public static class MyEndpoint implements Endpoint {
+
+ private Channel channel;
+ private Exchange exchange;
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void process(Exchange exchange) {
+ this.exchange = exchange;
+ exchange.setStatus(Status.Done);
+ channel.send(exchange);
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+ }
+}
Modified: incubator/servicemix/branches/servicemix-4.0/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-4.0/pom.xml?rev=570401&r1=570400&r2=570401&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-4.0/pom.xml (original)
+++ incubator/servicemix/branches/servicemix-4.0/pom.xml Tue Aug 28 05:55:43 2007
@@ -56,6 +56,7 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>1.0.0</version>
+ <extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>