You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/07 00:19:38 UTC
svn commit: r383702 [2/3] - in /incubator/servicemix/trunk:
servicemix-components/src/test/resources/org/apache/servicemix/components/email/
servicemix-core/src/main/java/org/apache/servicemix/jbi/audit/
servicemix-core/src/main/java/org/apache/service...
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java Mon Mar 6 15:19:33 2006
@@ -15,19 +15,14 @@
*/
package org.apache.servicemix.jbi.framework;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
-import org.apache.servicemix.jbi.servicedesc.DynamicEndpoint;
-import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
-import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
-import org.w3c.dom.Document;
-import org.w3c.dom.DocumentFragment;
-
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import javax.jbi.JBIException;
-import javax.jbi.component.Component;
import javax.jbi.component.ComponentContext;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
@@ -38,11 +33,17 @@
import javax.wsdl.factory.WSDLFactory;
import javax.xml.namespace.QName;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
+import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
+import org.w3c.dom.Document;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Registry for Components
@@ -55,8 +56,16 @@
private Registry registry;
- private Map endpoints;
-
+ private Map endpointMBeans;
+
+ private Map internalEndpoints;
+
+ private Map externalEndpoints;
+
+ private Map linkedEndpoints;
+
+ private Map interfaceConnections;
+
/**
* Constructor
*
@@ -64,7 +73,22 @@
*/
public EndpointRegistry(Registry registry) {
this.registry = registry;
- this.endpoints = new ConcurrentHashMap();
+ this.endpointMBeans = new ConcurrentHashMap();
+ this.internalEndpoints = new ConcurrentHashMap();
+ this.externalEndpoints = new ConcurrentHashMap();
+ this.linkedEndpoints = new ConcurrentHashMap();
+ this.interfaceConnections = new ConcurrentHashMap();
+ }
+
+ public ServiceEndpoint[] getEndpointsForComponent(ComponentNameSpace cns) {
+ Collection endpoints = new ArrayList();
+ for (Iterator iter = getInternalEndpoints().iterator(); iter.hasNext();) {
+ InternalEndpoint endpoint = (InternalEndpoint) iter.next();
+ if (cns.equals(endpoint.getComponentNameSpace())) {
+ endpoints.add(endpoint);
+ }
+ }
+ return asEndpointArray(endpoints);
}
/**
@@ -74,7 +98,7 @@
* @return array of endpoints
*/
public ServiceEndpoint[] getEndpointsForService(QName serviceName) {
- Collection collection = getEndpointsByName(serviceName, getInternalEndpoints());
+ Collection collection = getEndpointsByService(serviceName, getInternalEndpoints());
return asEndpointArray(collection);
}
@@ -86,8 +110,22 @@
* <code>null</code> then all activated endpoints in the JBI environment must be returned.
* @return an array of available endpoints for the specified interface name; must be non-null; may be empty.
*/
- public ServiceEndpoint[] getEndpoints(QName interfaceName) {
- Set result = getEndpointsByInterface(interfaceName, getInternalEndpoints());
+ public ServiceEndpoint[] getEndpointsForInterface(QName interfaceName) {
+ if (interfaceName == null) {
+ return asEndpointArray(internalEndpoints.values());
+ }
+ InterfaceConnection conn = (InterfaceConnection) interfaceConnections.get(interfaceName);
+ if (conn != null) {
+ String key = getKey(conn.service, conn.endpoint);
+ ServiceEndpoint ep = (ServiceEndpoint) internalEndpoints.get(key);
+ if (ep == null) {
+ logger.warn("Connection for interface " + interfaceName + " could not find target for service " + conn.service + " and endpoint " + conn.endpoint);
+ return new ServiceEndpoint[0];
+ } else {
+ return new ServiceEndpoint[] { ep };
+ }
+ }
+ Collection result = getEndpointsByInterface(interfaceName, getInternalEndpoints());
return asEndpointArray(result);
}
@@ -98,15 +136,53 @@
* @param serviceName
* @param endpointName
* @return the endpoint
+ * @throws JBIException
*/
- public InternalEndpoint activateEndpoint(ComponentContextImpl provider, QName serviceName, String endpointName) {
- InternalEndpoint answer = new InternalEndpoint(provider.getComponentNameSpace(), endpointName, serviceName);
+ public InternalEndpoint registerInternalEndpoint(ComponentContextImpl provider, QName serviceName, String endpointName) throws JBIException {
+ // Create endpoint
+ String key = getKey(serviceName, endpointName);
+ InternalEndpoint registered = (InternalEndpoint) internalEndpoints.get(key);
+ // Check if the endpoint has already been activated by another component
+ if (registered != null && registered.isLocal()) {
+ throw new JBIException("An internal endpoint for service " + serviceName + " and endpoint " + endpointName + " is already registered");
+ }
+ // Create a new endpoint
+ InternalEndpoint serviceEndpoint = new InternalEndpoint(provider.getComponentNameSpace(), endpointName, serviceName);
+ // Get implemented interfaces
if (provider.getActivationSpec().getInterfaceName() != null) {
- answer.addInterface(provider.getActivationSpec().getInterfaceName());
+ serviceEndpoint.addInterface(provider.getActivationSpec().getInterfaceName());
}
- retrieveInterfacesFromDescription(provider, answer);
- activateEndpoint(provider, answer);
- return answer;
+ retrieveInterfacesFromDescription(provider, serviceEndpoint);
+ // Set remote namespaces
+ if (registered != null) {
+ InternalEndpoint[] remote = registered.getRemoteEndpoints();
+ for (int i = 0; i < remote.length; i++) {
+ serviceEndpoint.addRemoteEndpoint(remote[i]);
+ }
+ }
+ // Register endpoint
+ internalEndpoints.put(key, serviceEndpoint);
+ registerEndpoint(serviceEndpoint);
+ fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_REGISTERED);
+ return serviceEndpoint;
+ }
+
+ /**
+ * Called by component context when endpoints are being deactivated.
+ *
+ * @param provider
+ * @param serviceEndpoint
+ */
+ public void unregisterInternalEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) {
+ if (serviceEndpoint.isClustered()) {
+ // set endpoint to be no more local
+ serviceEndpoint.setComponentName(null);
+ } else {
+ String key = getKey(serviceEndpoint);
+ internalEndpoints.remove(key);
+ unregisterEndpoint(key);
+ }
+ fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED);
}
protected void retrieveInterfacesFromDescription(ComponentContextImpl provider, InternalEndpoint answer) {
@@ -141,46 +217,34 @@
}
}
}
-
+
/**
- * Activate an Endpoint
+ * Registers a remote endpoint
*
- * @param provider
- * @param serviceEndpoint
+ * @param remote
*/
- public synchronized void activateEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) {
- ComponentConnector cc = registry.getLocalComponentConnector(serviceEndpoint.getComponentNameSpace());
- if (cc != null) {
- cc.addActiveEndpoint(serviceEndpoint);
- }
- try {
- Endpoint endpoint = new Endpoint(serviceEndpoint, this);
- ObjectName objectName = registry.getContainer().getManagementContext().createObjectName(endpoint);
- registry.getContainer().getManagementContext().registerMBean(objectName, endpoint, EndpointMBean.class);
- endpoints.put(serviceEndpoint, endpoint);
- } catch (JMException e) {
- logger.error("Could not register MBean for endpoint", e);
- }
+ public void registerRemoteEndpoint(InternalEndpoint remote) {
+ InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(getKey(remote));
+ // Create endpoint if not already existing
+ if (endpoint == null) {
+ endpoint = new InternalEndpoint(null, remote.getEndpointName(), remote.getServiceName());
+ internalEndpoints.put(getKey(endpoint), endpoint);
+ }
+ // Add remote endpoint
+ endpoint.addRemoteEndpoint(remote);
+ fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_REGISTERED);
}
-
+
/**
- * Called by component context when endpoints are being deactivated.
+ * Unregisters a remote endpoint
*
- * @param provider
- * @param serviceEndpoint
+ * @param remote
*/
- public void deactivateEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) {
- ComponentConnector cc = registry.getLocalComponentConnector(serviceEndpoint.getComponentNameSpace());
- if (cc != null) {
- cc.removeActiveEndpoint(serviceEndpoint);
- }
- Endpoint ep = (Endpoint) endpoints.remove(serviceEndpoint);
- if (ep != null) {
- try {
- registry.getContainer().getManagementContext().unregisterMBean(ep);
- } catch (JBIException e) {
- logger.error("Could not unregister MBean for endpoint", e);
- }
+ public void unregisterRemoteEndpoint(InternalEndpoint remote) {
+ InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(getKey(remote));
+ if (endpoint != null) {
+ endpoint.removeRemoteEndpoint(remote);
+ fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED);
}
}
@@ -192,33 +256,16 @@
* @return the activated ServiceEndpoint or null
*/
public ServiceEndpoint getEndpoint(QName service, String name) {
- ServiceEndpoint result = null;
- for (Iterator i = getInternalEndpoints().iterator();i.hasNext();) {
- ServiceEndpoint endpoint = (ServiceEndpoint) i.next();
- if (endpoint.getServiceName().equals(service) && endpoint.getEndpointName().equals(name)) {
- result = endpoint;
- break;
- }
+ String key = getKey(service, name);
+ ServiceEndpoint ep = (ServiceEndpoint) linkedEndpoints.get(key);
+ if (ep == null) {
+ ep = (ServiceEndpoint) internalEndpoints.get(key);
}
- return result;
+ return ep;
}
-
- /**
- * Retrieve the service description metadata for the specified endpoint.
- * <p>
- * Note that the result can use either the WSDL 1.1 or WSDL 2.0 description language.
- *
- * @param endpoint endpoint reference; must be non-null.
- * @return metadata describing endpoint, or <code>null</code> if metadata is unavailable.
- * @exception JBIException invalid endpoint reference.
- */
- public Document getEndpointDescriptor(ServiceEndpoint endpoint) throws JBIException {
- if (endpoint instanceof AbstractServiceEndpoint == false) {
- throw new JBIException("Descriptors can not be queried for external endpoints");
- }
- AbstractServiceEndpoint se = (AbstractServiceEndpoint) endpoint;
- Component component = registry.getComponent(se.getComponentNameSpace());
- return component.getServiceDescription(endpoint);
+
+ public ServiceEndpoint getInternalEndpoint(QName service, String name) {
+ return (ServiceEndpoint) internalEndpoints.get(getKey(service, name));
}
/**
@@ -228,12 +275,16 @@
*
* @param provider
* @param externalEndpoint the external endpoint to be registered, must be non-null.
+ * @throws JBIException
*/
- public void registerExternalEndpoint(ComponentContextImpl provider, ServiceEndpoint externalEndpoint) {
- ComponentConnector cc = registry.getLocalComponentConnector(provider.getComponentNameSpace());
- if (cc != null) {
- cc.addExternalActiveEndpoint(new ExternalEndpoint(cc.getComponentNameSpace(), externalEndpoint));
- }
+ public void registerExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) throws JBIException {
+ ExternalEndpoint serviceEndpoint = new ExternalEndpoint(cns, externalEndpoint);
+ if (externalEndpoints.get(getKey(serviceEndpoint)) != null) {
+ throw new JBIException("An external endpoint for service " + externalEndpoint.getServiceName() + " and endpoint " + externalEndpoint.getEndpointName() + " is already registered");
+ }
+ registerEndpoint(serviceEndpoint);
+ externalEndpoints.put(getKey(serviceEndpoint), serviceEndpoint);
+ fireEvent(serviceEndpoint, EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED);
}
/**
@@ -244,11 +295,10 @@
* @param provider
* @param externalEndpoint the external endpoint to be deregistered; must be non-null.
*/
- public void deregisterExternalEndpoint(ComponentContextImpl provider, ServiceEndpoint externalEndpoint) {
- ComponentConnector cc = registry.getLocalComponentConnector(provider.getComponentNameSpace());
- if (cc != null) {
- cc.removeExternalActiveEndpoint(externalEndpoint);
- }
+ public void unregisterExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) {
+ externalEndpoints.remove(getKey(externalEndpoint));
+ unregisterEndpoint(getKey(externalEndpoint));
+ fireEvent(externalEndpoint, EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED);
}
/**
@@ -258,8 +308,8 @@
* @return an array of available external endpoints for the specified interface name; must be non-null; may be
* empty.
*/
- public ServiceEndpoint[] getExternalEndpoints(QName interfaceName) {
- Set endpoints = getEndpointsByInterface(interfaceName, getExternalEndpoints());
+ public ServiceEndpoint[] getExternalEndpointsForInterface(QName interfaceName) {
+ Collection endpoints = getEndpointsByInterface(interfaceName, getExternalEndpoints());
return asEndpointArray(endpoints);
}
@@ -270,35 +320,11 @@
* @return an array of available external endpoints for the specified service name; must be non-null; may be empty.
*/
public ServiceEndpoint[] getExternalEndpointsForService(QName serviceName) {
- Set endpoints = getEndpointsByName(serviceName, getExternalEndpoints());
+ Collection endpoints = getEndpointsByService(serviceName, getExternalEndpoints());
return asEndpointArray(endpoints);
}
/**
- * Resolve the given endpoint reference into a service endpoint. This is called by the component when it has an EPR
- * that it wants to resolve into a service endpoint.
- * <p>
- * Note that the service endpoint returned refers to a dynamic endpoint; the endpoint will exist only as long as
- * this component retains a strong reference to the object returned by this method. The endpoint may not be included
- * in the list of "activated" endpoints.
- *
- * @param epr endpoint reference as an XML fragment; must be non-null.
- * @return the service endpoint corresponding to the given endpoint reference; <code>null</code> if the reference
- * cannot be resolved.
- */
- public ServiceEndpoint resolveEndpointReference(DocumentFragment epr) {
- Collection connectors = registry.getLocalComponentConnectors();
- for (Iterator iter = connectors.iterator(); iter.hasNext();) {
- LocalComponentConnector connector = (LocalComponentConnector) iter.next();
- ServiceEndpoint se = connector.getComponent().resolveEndpointReference(epr);
- if (se != null) {
- return new DynamicEndpoint(connector.getComponentNameSpace(), se, epr);
- }
- }
- return null;
- }
-
- /**
* Helper method to convert the given collection into an array of endpoints
*
* @param collection
@@ -308,12 +334,8 @@
if (collection == null) {
return new ServiceEndpoint[0];
}
- int size = collection.size();
- ServiceEndpoint[] answer = new ServiceEndpoint[size];
- Iterator it = collection.iterator();
- for (int i = 0; i < size; i++) {
- answer[i] = (ServiceEndpoint) it.next();
- }
+ ServiceEndpoint[] answer = new ServiceEndpoint[collection.size()];
+ answer = (ServiceEndpoint[]) collection.toArray(answer);
return answer;
}
@@ -324,8 +346,8 @@
* @param endpoints
* @return collection of endpoints
*/
- protected Set getEndpointsByName(QName serviceName, Set endpoints) {
- Set answer = new HashSet();
+ protected Collection getEndpointsByService(QName serviceName, Collection endpoints) {
+ Collection answer = new ArrayList();
for (Iterator i = endpoints.iterator(); i.hasNext();) {
ServiceEndpoint endpoint = (ServiceEndpoint) i.next();
if (endpoint.getServiceName().equals(serviceName)) {
@@ -341,7 +363,7 @@
* is applied.
*
*/
- protected Set getEndpointsByInterface(QName interfaceName, Set endpoints) {
+ protected Collection getEndpointsByInterface(QName interfaceName, Collection endpoints) {
if (interfaceName == null) {
return endpoints;
}
@@ -350,7 +372,7 @@
ServiceEndpoint endpoint = (ServiceEndpoint) i.next();
QName[] interfaces = endpoint.getInterfaces();
if (interfaces != null) {
- for (int k = 0;k < interfaces.length;k++) {
+ for (int k = 0; k < interfaces.length;k ++) {
QName qn = interfaces[k];
if (qn != null && qn.equals(interfaceName)) {
answer.add(endpoint);
@@ -363,71 +385,148 @@
}
/**
- * Utility method to get a ComponentConnector from a serviceName
+ * @return all default endpoints
+ */
+ protected Collection getInternalEndpoints() {
+ return internalEndpoints.values();
+ }
+
+ /**
+ * @return all external endpoints
+ */
+ protected Collection getExternalEndpoints() {
+ return externalEndpoints.values();
+ }
+
+ /**
+ * Registers an endpoint connection.
*
- * @param serviceName
- * @return the ComponentConnector
+ * @param fromSvc
+ * @param fromEp
+ * @param toSvc
+ * @param toEp
+ * @param link
+ * @throws JBIException
*/
- public ComponentConnector getComponentConnectorByServiceName(QName serviceName) {
- ComponentConnector result = null;
- Set set = getEndpointsByName(serviceName, getInternalEndpoints());
- if (!set.isEmpty()) {
- InternalEndpoint endpoint = (InternalEndpoint) set.iterator().next();
- result = registry.getComponentConnector(endpoint.getComponentNameSpace());
- }
- return result;
- }
-
- protected ComponentConnector getComponentConnectorByEndpointName(String endpointName) {
- ComponentConnector result = null;
- if (endpointName != null) {
- for (Iterator i = getInternalEndpoints().iterator();i.hasNext();) {
- InternalEndpoint endpoint = (InternalEndpoint) i.next();
- if (endpoint.getEndpointName().equals(endpointName)) {
- result = registry.getComponentConnector(endpoint.getComponentNameSpace());
- break;
- }
- }
+ public void registerEndpointConnection(QName fromSvc, String fromEp, QName toSvc, String toEp, String link) throws JBIException {
+ LinkedEndpoint ep = new LinkedEndpoint(fromSvc, fromEp, toSvc, toEp, link);
+ if (linkedEndpoints.get(getKey(ep)) != null) {
+ throw new JBIException("An endpoint connection for service " + ep.getServiceName() + " and name " + ep.getEndpointName() + " is already registered");
}
- return result;
+ linkedEndpoints.put(getKey(ep), ep);
+ registerEndpoint(ep);
+ fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_REGISTERED);
+ }
+
+ /**
+ * Unregister an endpoint connection.
+ *
+ * @param fromSvc
+ * @param fromEp
+ */
+ public void unregisterEndpointConnection(QName fromSvc, String fromEp) {
+ unregisterEndpoint(getKey(fromSvc, fromEp));
+ LinkedEndpoint ep = (LinkedEndpoint) linkedEndpoints.remove(getKey(fromSvc, fromEp));
+ fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_UNREGISTERED);
}
- protected InternalEndpoint getEndpointByName(String endpointName) {
- InternalEndpoint result = null;
- if (endpointName != null) {
- for (Iterator i = getInternalEndpoints().iterator();i.hasNext();) {
- InternalEndpoint endpoint = (InternalEndpoint) i.next();
- if (endpoint.getEndpointName().equals(endpointName)) {
- result = endpoint;
- break;
- }
- }
+ /**
+ * Registers an interface connection.
+ *
+ * @param fromItf
+ * @param toSvc
+ * @param toEp
+ * @throws JBIException
+ */
+ public void registerInterfaceConnection(QName fromItf, QName toSvc, String toEp) throws JBIException {
+ if (interfaceConnections.get(fromItf) != null) {
+ throw new JBIException("An interface connection for " + fromItf + " is already registered");
}
- return result;
+ interfaceConnections.put(fromItf, new InterfaceConnection(toSvc, toEp));
}
/**
- * @return all default endpoints
+ * Unregisters an interface connection.
+ *
+ * @param fromItf
*/
- protected Set getInternalEndpoints() {
- Set answer = new HashSet();
- for (Iterator iter = this.registry.getComponentConnectors().iterator();iter.hasNext();) {
- ComponentConnector cc = (ComponentConnector) iter.next();
- answer.addAll(cc.getActiveEndpoints());
+ public void unregisterInterfaceConnection(QName fromItf) {
+ interfaceConnections.remove(fromItf);
+
+ }
+
+ private void registerEndpoint(AbstractServiceEndpoint serviceEndpoint) {
+ String key = getKey(serviceEndpoint);
+ try {
+ Endpoint endpoint = new Endpoint(serviceEndpoint, registry);
+ ObjectName objectName = registry.getContainer().getManagementContext().createObjectName(endpoint);
+ registry.getContainer().getManagementContext().registerMBean(objectName, endpoint, EndpointMBean.class);
+ endpointMBeans.put(key, endpoint);
+ } catch (JMException e) {
+ logger.error("Could not register MBean for endpoint", e);
+ }
+ }
+
+ private void unregisterEndpoint(String key) {
+ Endpoint ep = (Endpoint) endpointMBeans.remove(key);
+ if (ep != null) {
+ try {
+ registry.getContainer().getManagementContext().unregisterMBean(ep);
+ } catch (JBIException e) {
+ logger.error("Could not unregister MBean for endpoint", e);
+ }
}
- return answer;
}
- /**
- * @return all external endpoints
- */
- protected Set getExternalEndpoints() {
- Set answer = new HashSet();
- for (Iterator iter = this.registry.getComponentConnectors().iterator(); iter.hasNext();) {
- ComponentConnector cc = (ComponentConnector) iter.next();
- answer.addAll(cc.getExternalActiveEndpoints());
+ private String getKey(ServiceEndpoint ep) {
+ return getKey(ep.getServiceName(), ep.getEndpointName());
+ }
+
+ private String getKey(QName svcName, String epName) {
+ return svcName + epName;
+ }
+
+ private static class InterfaceConnection {
+ QName service;
+ String endpoint;
+ InterfaceConnection(QName service, String endpoint) {
+ this.service = service;
+ this.endpoint = endpoint;
}
- return answer;
+ }
+
+ protected void fireEvent(ServiceEndpoint ep, int type) {
+ EndpointEvent event = new EndpointEvent(ep, type);
+ EndpointListener[] listeners = (EndpointListener[]) registry.getContainer().getListeners(EndpointListener.class);
+ for (int i = 0; i < listeners.length; i++) {
+ switch (type) {
+ case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED:
+ listeners[i].internalEndpointRegistered(event);
+ break;
+ case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED:
+ listeners[i].internalEndpointUnregistered(event);
+ break;
+ case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED:
+ listeners[i].externalEndpointRegistered(event);
+ break;
+ case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED:
+ listeners[i].externalEndpointUnregistered(event);
+ break;
+ case EndpointEvent.LINKED_ENDPOINT_REGISTERED:
+ listeners[i].linkedEndpointRegistered(event);
+ break;
+ case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED:
+ listeners[i].linkedEndpointUnregistered(event);
+ break;
+ case EndpointEvent.REMOTE_ENDPOINT_REGISTERED:
+ listeners[i].remoteEndpointRegistered(event);
+ break;
+ case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED:
+ listeners[i].remoteEndpointUnregistered(event);
+ break;
+ }
+ }
+
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/LocalComponentConnector.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/LocalComponentConnector.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/LocalComponentConnector.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/LocalComponentConnector.java Mon Mar 6 15:19:33 2006
@@ -18,14 +18,12 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
-import java.util.Set;
import javax.jbi.JBIException;
import javax.jbi.component.Component;
import javax.jbi.component.ComponentLifeCycle;
import javax.jbi.component.ServiceUnitManager;
import javax.jbi.management.LifeCycleMBean;
-import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
@@ -139,51 +137,6 @@
*/
public void setContext(ComponentContextImpl context) {
this.context = context;
- }
-
- /**
- * Add an activated endpoint
- *
- * @param endpoint
- */
- public void addActiveEndpoint(ServiceEndpoint endpoint) {
- packet.addActiveEndpoint(endpoint);
- }
-
- /**
- * remove an activated endpoint
- *
- * @param endpoint
- */
- public void removeActiveEndpoint(ServiceEndpoint endpoint) {
- packet.removeActiveEndpoint(endpoint);
- }
-
- /**
- * Add an external activated endpoint
- *
- * @param endpoint
- */
- public void addExternalActiveEndpoint(ServiceEndpoint endpoint) {
- packet.addExternalActiveEndpoint(endpoint);
- }
-
- /**
- * remove an external activated endpoint
- *
- * @param endpoint
- */
- public void removeExternalActiveEndpoint(ServiceEndpoint endpoint) {
- packet.removeExternalActiveEndpoint(endpoint);
- }
-
- /**
- * Get the Set of external activated endpoints
- *
- * @return the activated endpoint Set
- */
- public Set getExternalActiveEndpoints() {
- return packet.getExternalActiveEndpoints();
}
/**
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/Registry.java Mon Mar 6 15:19:33 2006
@@ -41,12 +41,13 @@
import org.apache.servicemix.jbi.deployment.ServiceUnit;
import org.apache.servicemix.jbi.management.BaseSystemService;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
+import org.apache.servicemix.jbi.servicedesc.DynamicEndpoint;
import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
import org.w3c.dom.Document;
import org.w3c.dom.DocumentFragment;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* Registry - state infomation including running state, SA's deployed etc.
@@ -60,7 +61,6 @@
private EndpointRegistry endpointRegistry;
private SubscriptionRegistry subscriptionRegistry;
private ServiceAssemblyRegistry serviceAssemblyRegistry;
- private List componentPacketListeners;
private Map serviceUnits;
/**
@@ -71,7 +71,6 @@
this.endpointRegistry = new EndpointRegistry(this);
this.subscriptionRegistry = new SubscriptionRegistry();
this.serviceAssemblyRegistry = new ServiceAssemblyRegistry(this);
- this.componentPacketListeners = new CopyOnWriteArrayList();
this.serviceUnits = new ConcurrentHashMap();
}
@@ -170,22 +169,20 @@
*/
public synchronized ServiceEndpoint activateEndpoint(ComponentContextImpl context, QName serviceName,
String endpointName) throws JBIException {
- InternalEndpoint result = endpointRegistry.activateEndpoint(context, serviceName, endpointName);
- if (result != null) {
- ComponentConnector cc = componentRegistry.getComponentConnector(result.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
- }
+ InternalEndpoint result = endpointRegistry.registerInternalEndpoint(context, serviceName, endpointName);
return result;
}
+ public ServiceEndpoint[] getEndpointsForComponent(ComponentNameSpace cns) {
+ return endpointRegistry.getEndpointsForComponent(cns);
+ }
+
/**
* @param interfaceName qualified name
* @return an array of available endpoints for the specified interface name;
*/
- public ServiceEndpoint[] getEndpoints(QName interfaceName) {
- return endpointRegistry.getEndpoints(interfaceName);
+ public ServiceEndpoint[] getEndpointsForInterface(QName interfaceName) {
+ return endpointRegistry.getEndpointsForInterface(interfaceName);
}
/**
@@ -193,43 +190,60 @@
* @param serviceEndpoint
*/
public void deactivateEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) {
- endpointRegistry.deactivateEndpoint(provider, serviceEndpoint);
- if (serviceEndpoint != null) {
- ComponentConnector cc = componentRegistry.getComponentConnector(serviceEndpoint.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
- }
+ endpointRegistry.unregisterInternalEndpoint(provider, serviceEndpoint);
}
/**
+ * Retrieve the service description metadata for the specified endpoint.
+ * <p>
+ * Note that the result can use either the WSDL 1.1 or WSDL 2.0 description language.
+ *
* @param endpoint endpoint reference; must be non-null.
* @return metadata describing endpoint, or <code>null</code> if metadata is unavailable.
* @throws JBIException invalid endpoint reference.
*/
public Document getEndpointDescriptor(ServiceEndpoint endpoint) throws JBIException {
- return endpointRegistry.getEndpointDescriptor(endpoint);
+ if (endpoint instanceof AbstractServiceEndpoint == false) {
+ throw new JBIException("Descriptors can not be queried for external endpoints");
+ }
+ AbstractServiceEndpoint se = (AbstractServiceEndpoint) endpoint;
+ // TODO: what if the endpoint is linked or dynamic
+ Component component = getComponent(se.getComponentNameSpace());
+ return component.getServiceDescription(endpoint);
}
/**
- * @param epr
- * @return endpoint
+ * Resolve the given endpoint reference into a service endpoint. This is called by the component when it has an EPR
+ * that it wants to resolve into a service endpoint.
+ * <p>
+ * Note that the service endpoint returned refers to a dynamic endpoint; the endpoint will exist only as long as
+ * this component retains a strong reference to the object returned by this method. The endpoint may not be included
+ * in the list of "activated" endpoints.
+ *
+ * @param epr endpoint reference as an XML fragment; must be non-null.
+ * @return the service endpoint corresponding to the given endpoint reference; <code>null</code> if the reference
+ * cannot be resolved.
*/
public ServiceEndpoint resolveEndpointReference(DocumentFragment epr) {
- return endpointRegistry.resolveEndpointReference(epr);
+ Collection connectors = getLocalComponentConnectors();
+ for (Iterator iter = connectors.iterator(); iter.hasNext();) {
+ LocalComponentConnector connector = (LocalComponentConnector) iter.next();
+ ServiceEndpoint se = connector.getComponent().resolveEndpointReference(epr);
+ if (se != null) {
+ return new DynamicEndpoint(connector.getComponentNameSpace(), se, epr);
+ }
+ }
+ return null;
}
/**
* @param provider
* @param externalEndpoint the external endpoint to be registered, must be non-null.
+ * @throws JBIException
*/
- public void registerExternalEndpoint(ComponentContextImpl provider, ServiceEndpoint externalEndpoint) {
+ public void registerExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) throws JBIException {
if (externalEndpoint != null) {
- endpointRegistry.registerExternalEndpoint(provider, externalEndpoint);
- ComponentConnector cc = componentRegistry.getComponentConnector(provider.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
+ endpointRegistry.registerExternalEndpoint(cns, externalEndpoint);
}
}
@@ -237,14 +251,8 @@
* @param provider
* @param externalEndpoint the external endpoint to be deregistered; must be non-null.
*/
- public void deregisterExternalEndpoint(ComponentContextImpl provider, ServiceEndpoint externalEndpoint) {
- endpointRegistry.deregisterExternalEndpoint(provider, externalEndpoint);
- if (externalEndpoint != null) {
- ComponentConnector cc = componentRegistry.getComponentConnector(provider.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
- }
+ public void deregisterExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) {
+ endpointRegistry.unregisterExternalEndpoint(cns, externalEndpoint);
}
/**
@@ -255,6 +263,10 @@
public ServiceEndpoint getEndpoint(QName service, String name) {
return endpointRegistry.getEndpoint(service, name);
}
+
+ public ServiceEndpoint getInternalEndpoint(QName service, String name) {
+ return endpointRegistry.getInternalEndpoint(service, name);
+ }
/**
* @param serviceName
@@ -269,7 +281,7 @@
* @return endpoints
*/
public ServiceEndpoint[] getExternalEndpoints(QName interfaceName) {
- return endpointRegistry.getExternalEndpoints(interfaceName);
+ return endpointRegistry.getExternalEndpointsForInterface(interfaceName);
}
/**
@@ -295,9 +307,6 @@
public LocalComponentConnector registerComponent(ComponentNameSpace name, String description,Component component,
boolean binding, boolean service) throws JBIException {
LocalComponentConnector result = componentRegistry.registerComponent(name,description, component, binding, service);
- if (result != null) {
- fireComponentPacketEvent(result, ComponentPacketEvent.ACTIVATED);
- }
return result;
}
@@ -307,9 +316,6 @@
*/
public ComponentConnector deregisterComponent(Component component) {
ComponentConnector result = componentRegistry.deregisterComponent(component);
- if (result != null) {
- fireComponentPacketEvent(result, ComponentPacketEvent.DEACTIVATED);
- }
return result;
}
@@ -321,24 +327,6 @@
}
/**
- * Add a listener
- *
- * @param l
- */
- public void addComponentPacketListener(ComponentPacketEventListener l) {
- this.componentPacketListeners.add(l);
- }
-
- /**
- * remove a listener
- *
- * @param l
- */
- public void removeComponentPacketListener(ComponentPacketEventListener l) {
- this.componentPacketListeners.remove(l);
- }
-
- /**
* Get a registered ComponentConnector from it's id
*
* @param id
@@ -350,15 +338,6 @@
/**
- * For distributed containers, get a ComponentConnector by round-robin
- * @param id
- * @return the ComponentConnector or null
- */
- public ComponentConnector getLoadBalancedComponentConnector(ComponentNameSpace id){
- return componentRegistry.getLoadBalancedComponentConnector(id);
- }
-
- /**
* Add a ComponentConnector to ComponentRegistry Should be called for adding remote ComponentConnectors from other
* Containers
*
@@ -605,12 +584,6 @@
InternalEndpoint sei = (InternalEndpoint)endpoint;
subscription.setName(context.getComponentNameSpace());
subscriptionRegistry.registerSubscription(subscription,sei);
- if (sei != null) {
- ComponentConnector cc = componentRegistry.getComponentConnector(sei.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
- }
}
/**
@@ -621,12 +594,6 @@
public InternalEndpoint deregisterSubscription(ComponentContextImpl context,SubscriptionSpec subscription) {
subscription.setName(context.getComponentNameSpace());
InternalEndpoint result = subscriptionRegistry.deregisterSubscription(subscription);
- if (result != null) {
- ComponentConnector cc = componentRegistry.getComponentConnector(result.getComponentNameSpace());
- if (cc != null) {
- fireComponentPacketEvent(cc, ComponentPacketEvent.STATE_CHANGE);
- }
- }
return result;
}
@@ -646,7 +613,7 @@
* @throws DeploymentException
* @deprecated
*/
- public boolean registerServiceAssembly(ServiceAssembly sa) throws DeploymentException{
+ public ServiceAssemblyLifeCycle registerServiceAssembly(ServiceAssembly sa) throws DeploymentException{
return serviceAssemblyRegistry.register(sa);
}
@@ -657,7 +624,7 @@
* @return true if not already registered
* @throws DeploymentException
*/
- public boolean registerServiceAssembly(ServiceAssembly sa, String[] deployedSUs) throws DeploymentException{
+ public ServiceAssemblyLifeCycle registerServiceAssembly(ServiceAssembly sa, String[] deployedSUs) throws DeploymentException{
return serviceAssemblyRegistry.register(sa, deployedSUs);
}
@@ -729,18 +696,6 @@
return serviceAssemblyRegistry.isDeployedServiceUnit(componentName, suName);
}
-
-
- protected void fireComponentPacketEvent(ComponentConnector cc, int status) {
- if (!componentPacketListeners.isEmpty()) {
- ComponentPacketEvent event = new ComponentPacketEvent(cc.getComponentPacket(), status);
- for (Iterator i = componentPacketListeners.iterator();i.hasNext();) {
- ComponentPacketEventListener l = (ComponentPacketEventListener) i.next();
- l.onEvent(event);
- }
- }
- }
-
/**
* Get a ServiceUnit by its key.
*
@@ -785,5 +740,29 @@
}
}
}
+
+ public void registerEndpointConnection(QName fromSvc, String fromEp, QName toSvc, String toEp, String link) throws JBIException {
+ endpointRegistry.registerEndpointConnection(fromSvc, fromEp, toSvc, toEp, link);
+ }
+
+ public void unregisterEndpointConnection(QName fromSvc, String fromEp) {
+ endpointRegistry.unregisterEndpointConnection(fromSvc, fromEp);
+ }
+ public void registerInterfaceConnection(QName fromItf, QName toSvc, String toEp) throws JBIException {
+ endpointRegistry.registerInterfaceConnection(fromItf, toSvc, toEp);
+ }
+
+ public void unregisterInterfaceConnection(QName fromItf) {
+ endpointRegistry.unregisterInterfaceConnection(fromItf);
+ }
+
+ public void registerRemoteEndpoint(ServiceEndpoint endpoint) {
+ endpointRegistry.registerRemoteEndpoint((InternalEndpoint) endpoint);
+ }
+
+ public void unregisterRemoteEndpoint(ServiceEndpoint endpoint) {
+ endpointRegistry.unregisterRemoteEndpoint((InternalEndpoint) endpoint);
+ }
+
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyLifeCycle.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyLifeCycle.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyLifeCycle.java Mon Mar 6 15:19:33 2006
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Properties;
+import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
@@ -40,10 +41,11 @@
import org.apache.servicemix.jbi.deployment.Consumes;
import org.apache.servicemix.jbi.deployment.Descriptor;
import org.apache.servicemix.jbi.deployment.ServiceAssembly;
+import org.apache.servicemix.jbi.event.ServiceAssemblyEvent;
+import org.apache.servicemix.jbi.event.ServiceAssemblyListener;
import org.apache.servicemix.jbi.management.AttributeInfoHelper;
import org.apache.servicemix.jbi.management.MBeanInfoProvider;
import org.apache.servicemix.jbi.management.OperationInfoHelper;
-import org.apache.servicemix.jbi.nmr.Broker;
import org.apache.servicemix.jbi.util.XmlPersistenceSupport;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -105,7 +107,11 @@
public String start(boolean writeState) throws Exception {
log.info("Starting service assembly: " + getName());
// Start connections
- startConnections();
+ try {
+ startConnections();
+ } catch (JBIException e) {
+ throw ManagementSupport.failure("start", e.getMessage());
+ }
// Start service units
List componentFailures = new ArrayList();
for (int i = 0; i < sus.length; i++) {
@@ -132,6 +138,7 @@
if (writeState) {
writeRunningState();
}
+ fireEvent(ServiceAssemblyEvent.ASSEMBLY_STARTED);
return ManagementSupport.createSuccessMessage("start");
} else {
throw ManagementSupport.failure("start", componentFailures);
@@ -145,15 +152,24 @@
* @throws Exception
*/
public String stop() throws Exception {
- return stop(true);
+ return stop(true, false);
}
- public String stop(boolean writeState) throws Exception {
+ public String stop(boolean writeState, boolean forceInit) throws Exception {
log.info("Stopping service assembly: " + getName());
// Stop connections
stopConnections();
// Stop service units
List componentFailures = new ArrayList();
+ if (forceInit) {
+ for (int i = 0; i < sus.length; i++) {
+ try {
+ sus[i].init();
+ } catch (DeploymentException e) {
+ componentFailures.add(getComponentFailure(e, "stop", sus[i].getComponentName()));
+ }
+ }
+ }
for (int i = 0; i < sus.length; i++) {
if (sus[i].isStarted()) {
try {
@@ -169,6 +185,7 @@
if (writeState) {
writeRunningState();
}
+ fireEvent(ServiceAssemblyEvent.ASSEMBLY_STOPPED);
return ManagementSupport.createSuccessMessage("stop");
} else {
throw ManagementSupport.failure("stop", componentFailures);
@@ -212,6 +229,7 @@
if (writeState) {
writeRunningState();
}
+ fireEvent(ServiceAssemblyEvent.ASSEMBLY_SHUTDOWN);
return ManagementSupport.createSuccessMessage("shutDown");
} else {
throw ManagementSupport.failure("shutDown", componentFailures);
@@ -307,11 +325,11 @@
void restore() throws Exception {
String state = getRunningStateFromStore();
if (STARTED.equals(state)) {
- start();
+ start(false);
} else {
- stop();
+ stop(false, true);
if (SHUTDOWN.equals(state)) {
- shutDown();
+ shutDown(false);
}
}
}
@@ -320,26 +338,25 @@
return sus;
}
- protected void startConnections() {
+ protected void startConnections() throws JBIException {
if (serviceAssembly.getConnections() == null ||
serviceAssembly.getConnections().getConnections() == null) {
return;
}
Connection[] connections = serviceAssembly.getConnections().getConnections();
- Broker broker = registry.getContainer().getBroker();
for (int i = 0; i < connections.length; i++) {
if (connections[i].getConsumer().getInterfaceName() != null) {
QName fromItf = connections[i].getConsumer().getInterfaceName();
QName toSvc = connections[i].getProvider().getServiceName();
String toEp = connections[i].getProvider().getEndpointName();
- broker.registerInterfaceConnection(fromItf, toSvc, toEp);
+ registry.registerInterfaceConnection(fromItf, toSvc, toEp);
} else {
QName fromSvc = connections[i].getConsumer().getServiceName();
String fromEp = connections[i].getConsumer().getEndpointName();
QName toSvc = connections[i].getProvider().getServiceName();
String toEp = connections[i].getProvider().getEndpointName();
String link = getLinkType(fromSvc, fromEp);
- broker.registerEndpointConnection(fromSvc, fromEp, toSvc, toEp, link);
+ registry.registerEndpointConnection(fromSvc, fromEp, toSvc, toEp, link);
}
}
}
@@ -362,25 +379,20 @@
protected void stopConnections() {
if (serviceAssembly.getConnections() == null ||
- serviceAssembly.getConnections().getConnections() == null) {
- return;
- }
- Connection[] connections = serviceAssembly.getConnections().getConnections();
- Broker broker = registry.getContainer().getBroker();
- for (int i = 0; i < connections.length; i++) {
- if (connections[i].getConsumer().getInterfaceName() != null) {
- QName fromItf = connections[i].getConsumer().getInterfaceName();
- QName toSvc = connections[i].getProvider().getServiceName();
- String toEp = connections[i].getProvider().getEndpointName();
- broker.unregisterInterfaceConnection(fromItf, toSvc, toEp);
- } else {
- QName fromSvc = connections[i].getConsumer().getServiceName();
- String fromEp = connections[i].getConsumer().getEndpointName();
- QName toSvc = connections[i].getProvider().getServiceName();
- String toEp = connections[i].getProvider().getEndpointName();
- broker.unregisterEndpointConnection(fromSvc, fromEp, toSvc, toEp);
- }
+ serviceAssembly.getConnections().getConnections() == null) {
+ return;
+ }
+ Connection[] connections = serviceAssembly.getConnections().getConnections();
+ for (int i = 0; i < connections.length; i++) {
+ if (connections[i].getConsumer().getInterfaceName() != null) {
+ QName fromItf = connections[i].getConsumer().getInterfaceName();
+ registry.unregisterInterfaceConnection(fromItf);
+ } else {
+ QName fromSvc = connections[i].getConsumer().getServiceName();
+ String fromEp = connections[i].getConsumer().getEndpointName();
+ registry.unregisterEndpointConnection(fromSvc, fromEp);
}
+ }
}
protected Element getComponentFailure(Exception exception, String task, String component) {
@@ -461,4 +473,24 @@
// TODO Auto-generated method stub
return null;
}
+
+ protected void fireEvent(int type) {
+ ServiceAssemblyEvent event = new ServiceAssemblyEvent(this, type);
+ ServiceAssemblyListener[] listeners = (ServiceAssemblyListener[]) registry.getContainer().getListeners(ServiceAssemblyListener.class);
+ for (int i = 0; i < listeners.length; i++) {
+ switch (type) {
+ case ServiceAssemblyEvent.ASSEMBLY_STARTED:
+ listeners[i].assemblyStarted(event);
+ break;
+ case ServiceAssemblyEvent.ASSEMBLY_STOPPED:
+ listeners[i].assemblyStopped(event);
+ break;
+ case ServiceAssemblyEvent.ASSEMBLY_SHUTDOWN:
+ listeners[i].assemblyShutDown(event);
+ break;
+ }
+ }
+
+ }
+
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyRegistry.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyRegistry.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyRegistry.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceAssemblyRegistry.java Mon Mar 6 15:19:33 2006
@@ -79,7 +79,7 @@
* @throws DeploymentException
* @deprecated
*/
- public boolean register(ServiceAssembly sa) throws DeploymentException {
+ public ServiceAssemblyLifeCycle register(ServiceAssembly sa) throws DeploymentException {
List sus = new ArrayList();
for (int i = 0; i < sa.getServiceUnits().length; i++) {
String suKey = registry.registerServiceUnit(
@@ -96,12 +96,11 @@
* @return true if successful
* @throws DeploymentException
*/
- public boolean register(ServiceAssembly sa, String[] sus) throws DeploymentException {
- boolean result = false;
+ public ServiceAssemblyLifeCycle register(ServiceAssembly sa, String[] sus) throws DeploymentException {
String saName = sa.getIdentification().getName();
File stateFile = registry.getEnvironmentContext().getServiceAssemblyStateFile(saName);
- ServiceAssemblyLifeCycle salc = new ServiceAssemblyLifeCycle(sa, sus, stateFile, registry);
if (!serviceAssemblies.containsKey(saName)) {
+ ServiceAssemblyLifeCycle salc = new ServiceAssemblyLifeCycle(sa, sus, stateFile, registry);
serviceAssemblies.put(saName, salc);
try {
ObjectName objectName = registry.getContainer().getManagementContext().createObjectName(salc);
@@ -109,9 +108,9 @@
} catch (JMException e) {
log.error("Could not register MBean for service assembly", e);
}
- result = true;
+ return salc;
}
- return result;
+ return null;
}
/**
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceUnitLifeCycle.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceUnitLifeCycle.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceUnitLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ServiceUnitLifeCycle.java Mon Mar 6 15:19:33 2006
@@ -29,6 +29,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.jbi.deployment.ServiceUnit;
+import org.apache.servicemix.jbi.event.ServiceUnitEvent;
+import org.apache.servicemix.jbi.event.ServiceUnitListener;
import org.apache.servicemix.jbi.management.AttributeInfoHelper;
import org.apache.servicemix.jbi.management.MBeanInfoProvider;
import org.apache.servicemix.jbi.management.OperationInfoHelper;
@@ -236,6 +238,25 @@
public String getKey() {
return getComponentName() + "/" + getName();
+ }
+
+ protected void fireEvent(int type) {
+ ServiceUnitEvent event = new ServiceUnitEvent(this, type);
+ ServiceUnitListener[] listeners = (ServiceUnitListener[]) registry.getContainer().getListeners(ServiceUnitListener.class);
+ for (int i = 0; i < listeners.length; i++) {
+ switch (type) {
+ case ServiceUnitEvent.UNIT_STARTED:
+ listeners[i].unitStarted(event);
+ break;
+ case ServiceUnitEvent.UNIT_STOPPED:
+ listeners[i].unitStopped(event);
+ break;
+ case ServiceUnitEvent.UNIT_SHUTDOWN:
+ listeners[i].unitShutDown(event);
+ break;
+ }
+ }
+
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Mon Mar 6 15:19:33 2006
@@ -132,7 +132,7 @@
((Thread) threads[i]).interrupt();
}
// deactivate all endpoints from this component
- ServiceEndpoint[] endpoints = (ServiceEndpoint[]) componentConnector.getActiveEndpoints().toArray(new ServiceEndpoint[0]);
+ ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForComponent(componentConnector.getComponentNameSpace());
for (int i = 0; i < endpoints.length; i++) {
try {
componentConnector.getContext().deactivateEndpoint(endpoints[i]);
@@ -185,17 +185,11 @@
log.debug("default destination endpointName for " + componentName + " = " + endpointName);
if (serviceName != null && endpointName != null) {
endpointName = endpointName.trim();
- ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForService(serviceName);
- if (endpoints != null) {
- for (int i = 0;i < endpoints.length;i++) {
- if (endpoints[i].getEndpointName().equals(endpointName)) {
- result.setEndpoint(endpoints[i]);
- log.info("Set default destination endpoint for " + componentName + " to "
- + endpoints[i]);
- endpointSet = true;
- break;
- }
- }
+ ServiceEndpoint endpoint = container.getRegistry().getEndpoint(serviceName, endpointName);
+ if (endpoint != null) {
+ result.setEndpoint(endpoint);
+ log.info("Set default destination endpoint for " + componentName + " to " + endpoint);
+ endpointSet = true;
}
}
if (!endpointSet) {
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java Mon Mar 6 15:19:33 2006
@@ -328,6 +328,16 @@
throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
}
}
+ // Resolve linked endpoints
+ if (theEndpoint instanceof LinkedEndpoint) {
+ QName svcName = ((LinkedEndpoint) theEndpoint).getToService();
+ String epName = ((LinkedEndpoint) theEndpoint).getToEndpoint();
+ ServiceEndpoint ep = registry.getInternalEndpoint(svcName, epName);
+ if (ep == null) {
+ throw new JBIException("Could not resolve linked endpoint: " + theEndpoint);
+ }
+ theEndpoint = ep;
+ }
// get the context which created the exchange
ComponentContextImpl context = exchange.getSourceContext();
@@ -346,7 +356,7 @@
}
}
if (theEndpoint == null && interfaceName != null) {
- ServiceEndpoint[] endpoints = registry.getEndpoints(interfaceName);
+ ServiceEndpoint[] endpoints = registry.getEndpointsForInterface(interfaceName);
endpoints = getMatchingEndpoints(endpoints, exchange);
theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
if (theEndpoint == null) {
@@ -393,16 +403,17 @@
LocalComponentConnector consumer = getRegistry().getLocalComponentConnector(exchange.getSourceId());
for (int i = 0; i < endpoints.length; i++) {
- ComponentNameSpace id = ((AbstractServiceEndpoint) endpoints[i]).getComponentNameSpace();
- LocalComponentConnector provider = getRegistry().getLocalComponentConnector(id);
- if (provider != null) {
- if (consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) &&
- provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
- filtered.add(endpoints[i]);
- }
- } else {
- filtered.add(endpoints[i]);
+ ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
+ if (id != null) {
+ LocalComponentConnector provider = getRegistry().getLocalComponentConnector(id);
+ if (provider != null) {
+ if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) ||
+ !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
+ continue;
+ }
+ }
}
+ filtered.add(endpoints[i]);
}
return (ServiceEndpoint[]) filtered.toArray(new ServiceEndpoint[filtered.size()]);
}
@@ -514,25 +525,5 @@
public JBIContainer getContainer() {
return container;
}
-
- public void registerInterfaceConnection(QName fromItf, QName toSvc, String toEp) {
- // TODO Auto-generated method stub
-
- }
-
- public void unregisterInterfaceConnection(QName fromItf, QName toSvc, String toEp) {
- // TODO Auto-generated method stub
-
- }
-
- public void registerEndpointConnection(QName fromSvc, String fromEp, QName toSvc, String toEp, String link) {
- LinkedEndpoint ep = new LinkedEndpoint(fromSvc, fromEp, toSvc, toEp, link);
- // TODO register endpoint
- }
-
- public void unregisterEndpointConnection(QName fromSvc, String fromEp, QName toSvc, String toEp) {
- // TODO Auto-generated method stub
-
- }
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Mon Mar 6 15:19:33 2006
@@ -47,7 +47,9 @@
* @version $Revision$
*/
public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
+
private static final Log log = LogFactory.getLog(AbstractFlow.class);
+
protected Broker broker;
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Thread suspendThread = null;
@@ -215,6 +217,7 @@
if (se == null) {
// Routing by service name
QName serviceName = me.getService();
+ QName interfaceName = me.getInterfaceName();
if (serviceName != null) {
ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForService(serviceName);
for (int i = 0; i < eps.length; i++) {
@@ -226,10 +229,9 @@
}
}
return false;
- } else {
- // Routing by interface name
- QName interfaceName = me.getInterfaceName();
- ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpoints(interfaceName);
+ // Routing by interface name
+ } else if (interfaceName != null) {
+ ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForInterface(interfaceName);
for (int i = 0; i < eps.length; i++) {
if (eps[i] instanceof InternalEndpoint) {
String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
@@ -238,6 +240,9 @@
}
}
}
+ return false;
+ } else {
+ // Should not happen
return false;
}
// Routing by endpoint