You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/03/08 14:59:27 UTC
svn commit: r516044 - in
/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms:
AbstractJMSFlow.java JMSFlow.java JMSFlowTibco.java
Author: gnodet
Date: Thu Mar 8 05:59:26 2007
New Revision: 516044
URL: http://svn.apache.org/viewvc?view=rev&rev=516044
Log:
SM-745: Allow using Tibco for the JMSFlow
Added:
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (with props)
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (with props)
Modified:
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=auto&rev=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Thu Mar 8 05:59:26 2007
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
+import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
+ *
+ */
+public abstract class AbstractJMSFlow extends AbstractFlow implements MessageListener {
+
+ private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
+
+ private String userName;
+
+ private String password;
+
+ ConnectionFactory connectionFactory;
+
+ protected Connection connection;
+
+ private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+
+ private MessageProducer queueProducer;
+
+ private MessageProducer topicProducer;
+
+ protected Topic broadcastTopic;
+
+ protected Session broadcastSession;
+
+ private MessageConsumer broadcastConsumer;
+
+ private Session inboundSession;
+
+ protected Set subscriberSet = new CopyOnWriteArraySet();
+
+ private Map consumerMap = new ConcurrentHashMap();
+
+ AtomicBoolean started = new AtomicBoolean(false);
+
+ private EndpointListener endpointListener;
+
+ private ComponentListener componentListener;
+
+ private Executor executor;
+
+ protected MessageConsumer monitorMessageConsumer = null;
+
+ private String jmsURL = "peer://org.apache.servicemix?persistent=false";
+
+ /**
+ * The type of Flow
+ *
+ * @return the type
+ */
+ public String getDescription() {
+ return "jms";
+ }
+
+ /**
+ * @return Returns the password.
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @param password
+ * The password to set.
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * @return Returns the userName.
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * @param userName
+ * The userName to set.
+ */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /**
+ * @return Returns the connectionFactory.
+ */
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ /**
+ * @param connectionFactory
+ * The connectionFactory to set.
+ */
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * @return Returns the broadcastDestinationName.
+ */
+ public String getBroadcastDestinationName() {
+ return broadcastDestinationName;
+ }
+
+ /**
+ * @param broadcastDestinationName
+ * The broadcastDestinationName to set.
+ */
+ public void setBroadcastDestinationName(String broadcastDestinationName) {
+ this.broadcastDestinationName = broadcastDestinationName;
+ }
+
+ /**
+ * Check if the flow can support the requested QoS for this exchange
+ *
+ * @param me
+ * the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me) {
+ if (isTransacted(me)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Initialize the Region
+ *
+ * @param broker
+ * @throws JBIException
+ */
+ public void init(Broker broker) throws JBIException {
+ log.debug(broker.getContainer().getName() + ": Initializing jms flow");
+ super.init(broker);
+ // Find executor
+ executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
+ // Create and register endpoint listener
+ endpointListener = new EndpointAdapter() {
+ public void internalEndpointRegistered(EndpointEvent event) {
+ onInternalEndpointRegistered(event, true);
+ }
+
+ public void internalEndpointUnregistered(EndpointEvent event) {
+ onInternalEndpointUnregistered(event, true);
+ }
+ };
+ broker.getContainer().addListener(endpointListener);
+ // Create and register component listener
+ componentListener = new ComponentAdapter() {
+ public void componentStarted(ComponentEvent event) {
+ onComponentStarted(event);
+ }
+
+ public void componentStopped(ComponentEvent event) {
+ onComponentStopped(event);
+ }
+ };
+ broker.getContainer().addListener(componentListener);
+ try {
+ if (connectionFactory == null) {
+ connectionFactory = createConnectionFactoryFromUrl(jmsURL);
+ }
+ if (userName != null) {
+ connection = connectionFactory.createConnection(userName, password);
+ } else {
+ connection = connectionFactory.createConnection();
+ }
+ connection.setClientID(broker.getContainer().getName());
+ connection.start();
+ inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
+ MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
+ inboundQueue.setMessageListener(this);
+ queueProducer = inboundSession.createProducer(null);
+ broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+ topicProducer = broadcastSession.createProducer(broadcastTopic);
+ topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ } catch (JMSException e) {
+ log.error("Failed to initialize JMSFlow", e);
+ throw new JBIException(e);
+ }
+ }
+
+ abstract protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL);
+
+ /*
+ * The following abstract methods have to be implemented by specialized JMS
+ * Flow providers to monitor consumers on the broadcast topic.
+ */
+
+ protected abstract void onConsumerMonitorMessage(Message message);
+
+ abstract public void startConsumerMonitor() throws JMSException;
+
+ public void stopConsumerMonitor() throws JMSException {
+ monitorMessageConsumer.close();
+ }
+
+ /**
+ * start the flow
+ *
+ * @throws JBIException
+ */
+ public void start() throws JBIException {
+ if (started.compareAndSet(false, true)) {
+ log.debug(broker.getContainer().getName() + ": Starting jms flow");
+ super.start();
+ try {
+ broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+ broadcastConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ try {
+ Object obj = ((ObjectMessage) message).getObject();
+ if (obj instanceof EndpointEvent) {
+ EndpointEvent event = (EndpointEvent) obj;
+ String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace()
+ .getContainerName();
+ if (!getBroker().getContainer().getName().equals(container)) {
+ if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+ onRemoteEndpointRegistered(event);
+ } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+ onRemoteEndpointUnregistered(event);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing incoming broadcast message", e);
+ }
+ }
+ });
+
+ // Start queue consumers for all components
+ for (Iterator it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) {
+ ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
+ if (cmp.isStarted()) {
+ onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
+ }
+ }
+ // Start queue consumers for all endpoints
+ ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+ onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+ EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
+ }
+ }
+
+ startConsumerMonitor();
+ } catch (JMSException e) {
+ JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+ throw jbiEx;
+ }
+ }
+ }
+
+ /**
+ * stop the flow
+ *
+ * @throws JBIException
+ */
+ public void stop() throws JBIException {
+ if (started.compareAndSet(true, false)) {
+ log.debug(broker.getContainer().getName() + ": Stopping jms flow");
+ super.stop();
+ for (Iterator it = subscriberSet.iterator(); it.hasNext();) {
+ String id = (String) it.next();
+ removeAllPackets(id);
+ }
+ subscriberSet.clear();
+ try {
+ stopConsumerMonitor();
+ broadcastConsumer.close();
+ } catch (JMSException e) {
+ log.debug("JMSException caught in stop", e);
+ }
+ }
+ }
+
+ public void shutDown() throws JBIException {
+ super.shutDown();
+ stop();
+ // Remove endpoint listener
+ broker.getContainer().removeListener(endpointListener);
+ // Remove component listener
+ broker.getContainer().removeListener(componentListener);
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (JMSException e) {
+ log.warn("Error closing JMS Connection", e);
+ }
+ }
+ }
+
+ /**
+ * useful for testing
+ *
+ * @return number of containers in the network
+ */
+ public int numberInNetwork() {
+ return subscriberSet.size();
+ }
+
+ public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+ if (!started.get()) {
+ return;
+ }
+ try {
+ String key = EndpointSupport.getKey(event.getEndpoint());
+ if (!consumerMap.containsKey(key)) {
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+ MessageConsumer consumer = inboundSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ consumerMap.put(key, consumer);
+ }
+ if (broadcast) {
+ log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ topicProducer.send(msg);
+ }
+ } catch (Exception e) {
+ log.error("Cannot create consumer for " + event.getEndpoint(), e);
+ }
+ }
+
+ public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
+ try {
+ String key = EndpointSupport.getKey(event.getEndpoint());
+ MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (broadcast) {
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+ topicProducer.send(msg);
+ }
+ } catch (Exception e) {
+ log.error("Cannot destroy consumer for " + event, e);
+ }
+ }
+
+ public void onComponentStarted(ComponentEvent event) {
+ if (!started.get()) {
+ return;
+ }
+ try {
+ String key = event.getComponent().getName();
+ if (!consumerMap.containsKey(key)) {
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+ MessageConsumer consumer = inboundSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ consumerMap.put(key, consumer);
+ }
+ } catch (Exception e) {
+ log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
+ }
+ }
+
+ public void onComponentStopped(ComponentEvent event) {
+ try {
+ String key = event.getComponent().getName();
+ MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (Exception e) {
+ log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
+ }
+ }
+
+ public void onRemoteEndpointRegistered(EndpointEvent event) {
+ log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
+ broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
+ }
+
+ public void onRemoteEndpointUnregistered(EndpointEvent event) {
+ log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
+ broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param me
+ * @throws MessagingException
+ */
+ protected void doSend(MessageExchangeImpl me) throws MessagingException {
+ doRouting(me);
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param me
+ * @throws MessagingException
+ */
+ public void doRouting(MessageExchangeImpl me) throws MessagingException {
+ // let ActiveMQ do the routing ...
+ try {
+ String destination;
+ if (me.getRole() == Role.PROVIDER) {
+ if (me.getDestinationId() == null) {
+ destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
+ } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
+ destination = INBOUND_PREFIX + me.getDestinationId().getName();
+ } else {
+ destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
+ }
+ } else {
+ if (me.getSourceId() == null) {
+ throw new IllegalStateException("No sourceId set on the exchange");
+ } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
+ // If the consumer is stateless and has specified a sender
+ // endpoint,
+ // this exchange will be sent to the given endpoint queue,
+ // so that
+ // fail-over and load-balancing can be achieved
+ // This property must have been created using
+ // EndpointSupport.getKey
+ if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
+ destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
+ } else {
+ destination = INBOUND_PREFIX + me.getSourceId().getName();
+ }
+ } else {
+ destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
+ }
+ }
+
+ Queue queue = inboundSession.createQueue(destination);
+ ObjectMessage msg = inboundSession.createObjectMessage(me);
+ queueProducer.send(queue, msg);
+ } catch (JMSException e) {
+ log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+ throw new MessagingException(e);
+ }
+ }
+
+ /**
+ * MessageListener implementation
+ *
+ * @param message
+ */
+ public void onMessage(final Message message) {
+ try {
+ if (message != null && started.get()) {
+ ObjectMessage objMsg = (ObjectMessage) message;
+ final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+ // Dispatch the message in another thread so as to free the jms
+ // session
+ // else if a component do a sendSync into the jms flow, the
+ // whole
+ // flow is deadlocked
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ if (me.getDestinationId() == null) {
+ ServiceEndpoint se = me.getEndpoint();
+ se = broker.getContainer().getRegistry().getInternalEndpoint(se.getServiceName(),
+ se.getEndpointName());
+ me.setEndpoint(se);
+ me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+ }
+ AbstractJMSFlow.super.doRouting(me);
+ } catch (Throwable e) {
+ log.error("Caught an exception routing ExchangePacket: ", e);
+ }
+ }
+ });
+ }
+ } catch (JMSException jmsEx) {
+ log.error("Caught an exception unpacking JMS Message: ", jmsEx);
+ }
+ }
+
+ /**
+ * A new cluster node is announced. Add this node to the subscriber set and
+ * send all our local internal endpoints to this node.
+ *
+ * @param connectionId
+ */
+ protected void addClusterNode(String connectionId) {
+ subscriberSet.add(connectionId);
+ ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+ onInternalEndpointRegistered(
+ new EndpointEvent(endpoints[i], EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+ }
+ }
+ }
+
+ /**
+ * A cluster node leaves the cluster. Remove this node from the subscriber
+ * set and remove all packets waiting to be delivered to this node
+ *
+ * @param connectionId
+ */
+ protected void removeClusterNode(String connectionId) {
+ subscriberSet.remove(connectionId);
+ removeAllPackets(connectionId);
+ }
+
+ protected void removeAllPackets(String containerName) {
+ // TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+ }
+
+ public String getJmsURL() {
+ return jmsURL;
+ }
+
+ public void setJmsURL(String jmsURL) {
+ this.jmsURL = jmsURL;
+ }
+
+}
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=516044&r1=516043&r2=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Thu Mar 8 05:59:26 2007
@@ -16,27 +16,12 @@
*/
package org.apache.servicemix.jbi.nmr.flow.jms;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.jms.DeliveryMode;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.jms.Topic;
-import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
@@ -44,527 +29,45 @@
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.executors.Executor;
-import org.apache.servicemix.jbi.event.ComponentAdapter;
-import org.apache.servicemix.jbi.event.ComponentEvent;
-import org.apache.servicemix.jbi.event.ComponentListener;
-import org.apache.servicemix.jbi.event.EndpointAdapter;
-import org.apache.servicemix.jbi.event.EndpointEvent;
-import org.apache.servicemix.jbi.event.EndpointListener;
-import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
-import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
-import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
-
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
- * Use for message routing among a network of containers. All routing/registration happens automatically.
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
*
* @version $Revision$
* @org.apache.xbean.XBean element="jmsFlow"
*/
-public class JMSFlow extends AbstractFlow implements MessageListener {
-
- private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
-
- private String jmsURL = "peer://org.apache.servicemix?persistent=false";
-
- private String userName;
-
- private String password;
-
- private ActiveMQConnectionFactory connectionFactory;
-
- private ActiveMQConnection connection;
-
- private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
-
- private MessageProducer queueProducer;
-
- private MessageProducer topicProducer;
-
- private Topic broadcastTopic;
-
- private Session broadcastSession;
-
- private MessageConsumer broadcastConsumer;
-
- private Session inboundSession;
-
- private MessageConsumer advisoryConsumer;
-
- private Set subscriberSet = new CopyOnWriteArraySet();
-
- private Map consumerMap = new ConcurrentHashMap();
+public class JMSFlow extends AbstractJMSFlow {
- private AtomicBoolean started = new AtomicBoolean(false);
-
- private EndpointListener endpointListener;
-
- private ComponentListener componentListener;
-
- private Executor executor;
-
- /**
- * The type of Flow
- *
- * @return the type
- */
- public String getDescription() {
- return "jms";
- }
-
- /**
- * @return Returns the jmsURL.
- */
- public String getJmsURL() {
- return jmsURL;
- }
-
- /**
- * @param jmsURL The jmsURL to set.
- */
- public void setJmsURL(String jmsURL) {
- this.jmsURL = jmsURL;
+ protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
+ return (jmsURL != null) ? new ActiveMQConnectionFactory(jmsURL) : new ActiveMQConnectionFactory();
}
/**
- * @return Returns the password.
+ * Listener on the ActiveMQ advisory topic so we get messages when a cluster
+ * node is added or removed
*/
- public String getPassword() {
- return password;
- }
-
- /**
- * @param password The password to set.
- */
- public void setPassword(String password) {
- this.password = password;
- }
-
- /**
- * @return Returns the userName.
- */
- public String getUserName() {
- return userName;
- }
-
- /**
- * @param userName The userName to set.
- */
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- /**
- * @return Returns the connectionFactory.
- */
- public ActiveMQConnectionFactory getConnectionFactory() {
- return connectionFactory;
- }
-
- /**
- * @param connectionFactory The connectionFactory to set.
- */
- public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
- }
-
- /**
- * @return Returns the broadcastDestinationName.
- */
- public String getBroadcastDestinationName() {
- return broadcastDestinationName;
- }
-
- /**
- * @param broadcastDestinationName The broadcastDestinationName to set.
- */
- public void setBroadcastDestinationName(String broadcastDestinationName) {
- this.broadcastDestinationName = broadcastDestinationName;
- }
-
- /**
- * Check if the flow can support the requested QoS for this exchange
- * @param me the exchange to check
- * @return true if this flow can handle the given exchange
- */
- public boolean canHandle(MessageExchange me) {
- if (isTransacted(me)) {
- return false;
- }
- return true;
- }
-
- /**
- * Initialize the Region
- *
- * @param broker
- * @throws JBIException
- */
- public void init(Broker broker) throws JBIException {
- log.debug(broker.getContainer().getName() + ": Initializing jms flow");
- super.init(broker);
- // Find executor
- executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
- // Create and register endpoint listener
- endpointListener = new EndpointAdapter() {
- public void internalEndpointRegistered(EndpointEvent event) {
- onInternalEndpointRegistered(event, true);
- }
-
- public void internalEndpointUnregistered(EndpointEvent event) {
- onInternalEndpointUnregistered(event, true);
- }
- };
- broker.getContainer().addListener(endpointListener);
- // Create and register component listener
- componentListener = new ComponentAdapter() {
- public void componentStarted(ComponentEvent event) {
- onComponentStarted(event);
- }
- public void componentStopped(ComponentEvent event) {
- onComponentStopped(event);
- }
- };
- broker.getContainer().addListener(componentListener);
- try {
- if (connectionFactory == null) {
- if (jmsURL != null) {
- connectionFactory = new ActiveMQConnectionFactory(jmsURL);
- } else {
- connectionFactory = new ActiveMQConnectionFactory();
- }
- }
- if (userName != null) {
- connection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
- } else {
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- }
- connection.setClientID(broker.getContainer().getName());
- connection.start();
- inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
- MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
- inboundQueue.setMessageListener(this);
- queueProducer = inboundSession.createProducer(null);
- broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
- topicProducer = broadcastSession.createProducer(broadcastTopic);
- topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- } catch (JMSException e) {
- log.error("Failed to initialize JMSFlow", e);
- throw new JBIException(e);
- }
- }
-
- /**
- * start the flow
- *
- * @throws JBIException
- */
- public void start() throws JBIException {
- if (started.compareAndSet(false, true)) {
- log.debug(broker.getContainer().getName() + ": Starting jms flow");
- super.start();
- try {
- broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
- broadcastConsumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- try {
- Object obj = ((ObjectMessage) message).getObject();
- if (obj instanceof EndpointEvent) {
- EndpointEvent event = (EndpointEvent) obj;
- String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
- if (!getBroker().getContainer().getName().equals(container)) {
- if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
- onRemoteEndpointRegistered(event);
- } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
- onRemoteEndpointUnregistered(event);
- }
- }
- }
- } catch (Exception e) {
- log.error("Error processing incoming broadcast message", e);
- }
- }
- });
- Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
- advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
- advisoryConsumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- if (started.get()) {
- onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
- }
- }
- });
-
- // Start queue consumers for all components
- for (Iterator it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) {
- ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
- if (cmp.isStarted()) {
- onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
- }
- }
- // Start queue consumers for all endpoints
- ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
- for (int i = 0; i < endpoints.length; i++) {
- if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
- onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
- EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
- }
- }
- } catch (JMSException e) {
- JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
- throw jbiEx;
- }
- }
- }
-
- /**
- * stop the flow
- *
- * @throws JBIException
- */
- public void stop() throws JBIException {
- if (started.compareAndSet(true, false)) {
- log.debug(broker.getContainer().getName() + ": Stopping jms flow");
- super.stop();
- for (Iterator it = subscriberSet.iterator(); it.hasNext();) {
- String id = (String) it.next();
- removeAllPackets(id);
- }
- subscriberSet.clear();
- try {
- advisoryConsumer.close();
- broadcastConsumer.close();
- } catch (JMSException e) {
- log.debug("JMSException caught in stop", e);
- }
- }
- }
-
- public void shutDown() throws JBIException {
- super.shutDown();
- stop();
- // Remove endpoint listener
- broker.getContainer().removeListener(endpointListener);
- // Remove component listener
- broker.getContainer().removeListener(componentListener);
- // Shutdown executor
- executor.shutdown();
- // Close connection
- if (this.connection != null) {
- try {
- this.connection.close();
- } catch (JMSException e) {
- log.warn("Error closing JMS Connection", e);
- }
- }
- }
-
- /**
- * useful for testing
- *
- * @return number of containers in the network
- */
- public int numberInNetwork() {
- return subscriberSet.size();
- }
-
- public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
- if (!started.get()) {
- return;
- }
- try {
- String key = EndpointSupport.getKey(event.getEndpoint());
- if (!consumerMap.containsKey(key)) {
- Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
- MessageConsumer consumer = inboundSession.createConsumer(queue);
- consumer.setMessageListener(this);
- consumerMap.put(key, consumer);
- }
- if (broadcast) {
- log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- topicProducer.send(msg);
- }
- } catch (Exception e) {
- log.error("Cannot create consumer for " + event.getEndpoint(), e);
- }
- }
-
- public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
- try {
- String key = EndpointSupport.getKey(event.getEndpoint());
- MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
- if (consumer != null) {
- consumer.close();
- }
- if (broadcast) {
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
- topicProducer.send(msg);
- }
- } catch (Exception e) {
- log.error("Cannot destroy consumer for " + event, e);
- }
- }
-
- public void onComponentStarted(ComponentEvent event) {
- if (!started.get()) {
+ protected void onConsumerMonitorMessage(Message advisoryMessage) {
+ if (!started.get())
return;
- }
- try {
- String key = event.getComponent().getName();
- if (!consumerMap.containsKey(key)) {
- Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
- MessageConsumer consumer = inboundSession.createConsumer(queue);
- consumer.setMessageListener(this);
- consumerMap.put(key, consumer);
- }
- } catch (Exception e) {
- log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
- }
- }
-
- public void onComponentStopped(ComponentEvent event) {
- try {
- String key = event.getComponent().getName();
- MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
- if (consumer != null) {
- consumer.close();
- }
- } catch (Exception e) {
- log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
- }
- }
-
- public void onRemoteEndpointRegistered(EndpointEvent event) {
- log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
- broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
- }
-
- public void onRemoteEndpointUnregistered(EndpointEvent event) {
- log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
- broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
- }
-
- /**
- * Distribute an ExchangePacket
- *
- * @param me
- * @throws MessagingException
- */
- protected void doSend(MessageExchangeImpl me) throws MessagingException {
- doRouting(me);
- }
-
- /**
- * Distribute an ExchangePacket
- *
- * @param me
- * @throws MessagingException
- */
- public void doRouting(MessageExchangeImpl me) throws MessagingException {
- // let ActiveMQ do the routing ...
- try {
- String destination;
- if (me.getRole() == Role.PROVIDER) {
- if (me.getDestinationId() == null) {
- destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
- } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
- destination = INBOUND_PREFIX + me.getDestinationId().getName();
- } else {
- destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
- }
- } else {
- if (me.getSourceId() == null) {
- throw new IllegalStateException("No sourceId set on the exchange");
- } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
- // If the consumer is stateless and has specified a sender endpoint,
- // this exchange will be sent to the given endpoint queue, so that
- // fail-over and load-balancing can be achieved
- // This property must have been created using EndpointSupport.getKey
- if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
- destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
- } else {
- destination = INBOUND_PREFIX + me.getSourceId().getName();
- }
- } else {
- destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
- }
- }
-
- Queue queue = inboundSession.createQueue(destination);
- ObjectMessage msg = inboundSession.createObjectMessage(me);
- queueProducer.send(queue, msg);
- } catch (JMSException e) {
- log.error("Failed to send exchange: " + me + " internal JMS Network", e);
- throw new MessagingException(e);
- }
- }
-
- /**
- * MessageListener implementation
- *
- * @param message
- */
- public void onMessage(final Message message) {
- try {
- if (message != null && started.get()) {
- ObjectMessage objMsg = (ObjectMessage) message;
- final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
- // Dispatch the message in another thread so as to free the jms session
- // else if a component do a sendSync into the jms flow, the whole
- // flow is deadlocked
- executor.execute(new Runnable() {
- public void run() {
- try {
- if (me.getDestinationId() == null) {
- ServiceEndpoint se = me.getEndpoint();
- se = broker.getContainer().getRegistry()
- .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
- me.setEndpoint(se);
- me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
- }
- JMSFlow.super.doRouting(me);
- } catch (Throwable e) {
- log.error("Caught an exception routing ExchangePacket: ", e);
- }
- }
- });
- }
- } catch (JMSException jmsEx) {
- log.error("Caught an exception unpacking JMS Message: ", jmsEx);
- }
- }
-
- protected void onAdvisoryMessage(Object obj) {
+ Object obj = ((ActiveMQMessage) advisoryMessage).getDataStructure();
if (obj instanceof ConsumerInfo) {
ConsumerInfo info = (ConsumerInfo) obj;
- subscriberSet.add(info.getConsumerId().getConnectionId());
- ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
- for (int i = 0; i < endpoints.length; i++) {
- if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
- onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
- EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
- }
- }
+ addClusterNode(info.getConsumerId().getConnectionId());
} else if (obj instanceof RemoveInfo) {
- ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
- subscriberSet.remove(id.getConnectionId());
- removeAllPackets(id.getConnectionId());
+ ConsumerId consumerId = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+ removeClusterNode(consumerId.getConnectionId());
}
}
- private void removeAllPackets(String containerName) {
- //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+ public void startConsumerMonitor() throws JMSException {
+ Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
+ monitorMessageConsumer = broadcastSession.createConsumer(advisoryTopic);
+ monitorMessageConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ onConsumerMonitorMessage(message);
+ }
+ });
}
+
}
Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=auto&rev=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Thu Mar 8 05:59:26 2007
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import java.lang.reflect.Constructor;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+
+/**
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
+ *
+ * @version $Revision$
+ * @org.apache.xbean.XBean element="jmsFlowTibco"
+ */
+public class JMSFlowTibco extends AbstractJMSFlow {
+
+ private static final String TOPIC_NAME_MONITOR_CONSUMER = "$sys.monitor.consumer.*";
+
+ private static final String PROPERTY_NAME_EVENT_CLASS = "event_class";
+
+ private static final String PROPERTY_NAME_TARGET_DEST_NAME = "target_dest_name";
+
+ private static final String PROPERTY_NAME_CONN_CONNID = "conn_connid";
+
+ private static final String EVENT_CLASS_CONSUMER_CREATE = "consumer.create";
+
+ protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
+ try {
+ Class connFactoryClass = Class.forName("com.tibco.tibjms.TibjmsConnectionFactory");
+ if (jmsURL != null) {
+ Constructor cns = connFactoryClass.getConstructor(new Class[] { String.class });
+ ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] { jmsURL });
+ return connFactory;
+ } else {
+ ConnectionFactory connFactory = (ConnectionFactory) connFactoryClass.newInstance();
+ return connFactory;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to create Tibco connection factory", e);
+ }
+ /*
+ return (jmsURL != null) ? new com.tibco.tibjms.TibjmsConnectionFactory.TibjmsConnectionFactory(jmsURL) :
+ new com.tibco.tibjms.TibjmsConnectionFactory.TibjmsConnectionFactory();
+ */
+ }
+
+ public void onConsumerMonitorMessage(Message message) {
+ if (!started.get())
+ return;
+ try {
+ String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
+ String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);
+ String eventClass = message.getStringProperty(PROPERTY_NAME_EVENT_CLASS);
+ if (broadcastTopic.getTopicName().equals(targetDestName)) {
+ if (EVENT_CLASS_CONSUMER_CREATE.equals(eventClass)) {
+ addClusterNode(connectionId);
+ } else {
+ removeClusterNode(connectionId);
+ }
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void startConsumerMonitor() throws JMSException {
+ Topic createTopic = broadcastSession.createTopic(TOPIC_NAME_MONITOR_CONSUMER);
+ monitorMessageConsumer = broadcastSession.createConsumer(createTopic);
+ monitorMessageConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ onConsumerMonitorMessage(message);
+ }
+ });
+ }
+
+}
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
svn:mime-type = text/plain