You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/03/08 14:59:27 UTC

svn commit: r516044 - in /incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms: AbstractJMSFlow.java JMSFlow.java JMSFlowTibco.java

Author: gnodet
Date: Thu Mar  8 05:59:26 2007
New Revision: 516044

URL: http://svn.apache.org/viewvc?view=rev&rev=516044
Log:
SM-745: Allow using Tibco for the JMSFlow

Added:
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java   (with props)
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java   (with props)
Modified:
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java

Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=auto&rev=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Thu Mar  8 05:59:26 2007
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
+import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
+ * 
+ */
+public abstract class AbstractJMSFlow extends AbstractFlow implements MessageListener {
+
+    private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
+
+    private String userName;
+
+    private String password;
+
+    ConnectionFactory connectionFactory;
+
+    protected Connection connection;
+
+    private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+
+    private MessageProducer queueProducer;
+
+    private MessageProducer topicProducer;
+
+    protected Topic broadcastTopic;
+
+    protected Session broadcastSession;
+
+    private MessageConsumer broadcastConsumer;
+
+    private Session inboundSession;
+
+    protected Set subscriberSet = new CopyOnWriteArraySet();
+
+    private Map consumerMap = new ConcurrentHashMap();
+
+    AtomicBoolean started = new AtomicBoolean(false);
+
+    private EndpointListener endpointListener;
+
+    private ComponentListener componentListener;
+
+    private Executor executor;
+
+    protected MessageConsumer monitorMessageConsumer = null;
+
+    private String jmsURL = "peer://org.apache.servicemix?persistent=false";
+
+    /**
+     * The type of Flow
+     * 
+     * @return the type
+     */
+    public String getDescription() {
+        return "jms";
+    }
+
+    /**
+     * @return Returns the password.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @param password
+     *            The password to set.
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * @return Returns the userName.
+     */
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * @param userName
+     *            The userName to set.
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /**
+     * @return Returns the connectionFactory.
+     */
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    /**
+     * @param connectionFactory
+     *            The connectionFactory to set.
+     */
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    /**
+     * @return Returns the broadcastDestinationName.
+     */
+    public String getBroadcastDestinationName() {
+        return broadcastDestinationName;
+    }
+
+    /**
+     * @param broadcastDestinationName
+     *            The broadcastDestinationName to set.
+     */
+    public void setBroadcastDestinationName(String broadcastDestinationName) {
+        this.broadcastDestinationName = broadcastDestinationName;
+    }
+
+    /**
+     * Check if the flow can support the requested QoS for this exchange
+     * 
+     * @param me
+     *            the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me) {
+        if (isTransacted(me)) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Initialize the Region
+     * 
+     * @param broker
+     * @throws JBIException
+     */
+    public void init(Broker broker) throws JBIException {
+        log.debug(broker.getContainer().getName() + ": Initializing jms flow");
+        super.init(broker);
+        // Find executor
+        executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
+        // Create and register endpoint listener
+        endpointListener = new EndpointAdapter() {
+            public void internalEndpointRegistered(EndpointEvent event) {
+                onInternalEndpointRegistered(event, true);
+            }
+
+            public void internalEndpointUnregistered(EndpointEvent event) {
+                onInternalEndpointUnregistered(event, true);
+            }
+        };
+        broker.getContainer().addListener(endpointListener);
+        // Create and register component listener
+        componentListener = new ComponentAdapter() {
+            public void componentStarted(ComponentEvent event) {
+                onComponentStarted(event);
+            }
+
+            public void componentStopped(ComponentEvent event) {
+                onComponentStopped(event);
+            }
+        };
+        broker.getContainer().addListener(componentListener);
+        try {
+            if (connectionFactory == null) {
+                connectionFactory = createConnectionFactoryFromUrl(jmsURL);
+            }
+            if (userName != null) {
+                connection = connectionFactory.createConnection(userName, password);
+            } else {
+                connection = connectionFactory.createConnection();
+            }
+            connection.setClientID(broker.getContainer().getName());
+            connection.start();
+            inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
+            MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
+            inboundQueue.setMessageListener(this);
+            queueProducer = inboundSession.createProducer(null);
+            broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+            topicProducer = broadcastSession.createProducer(broadcastTopic);
+            topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        } catch (JMSException e) {
+            log.error("Failed to initialize JMSFlow", e);
+            throw new JBIException(e);
+        }
+    }
+
+    abstract protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL);
+
+    /*
+     * The following abstract methods have to be implemented by specialized JMS
+     * Flow providers to monitor consumers on the broadcast topic.
+     */
+
+    protected abstract void onConsumerMonitorMessage(Message message);
+
+    abstract public void startConsumerMonitor() throws JMSException;
+
+    public void stopConsumerMonitor() throws JMSException {
+        monitorMessageConsumer.close();
+    }
+
+    /**
+     * start the flow
+     * 
+     * @throws JBIException
+     */
+    public void start() throws JBIException {
+        if (started.compareAndSet(false, true)) {
+            log.debug(broker.getContainer().getName() + ": Starting jms flow");
+            super.start();
+            try {
+                broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+                broadcastConsumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        try {
+                            Object obj = ((ObjectMessage) message).getObject();
+                            if (obj instanceof EndpointEvent) {
+                                EndpointEvent event = (EndpointEvent) obj;
+                                String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace()
+                                        .getContainerName();
+                                if (!getBroker().getContainer().getName().equals(container)) {
+                                    if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+                                        onRemoteEndpointRegistered(event);
+                                    } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+                                        onRemoteEndpointUnregistered(event);
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {
+                            log.error("Error processing incoming broadcast message", e);
+                        }
+                    }
+                });
+
+                // Start queue consumers for all components
+                for (Iterator it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) {
+                    ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
+                    if (cmp.isStarted()) {
+                        onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
+                    }
+                }
+                // Start queue consumers for all endpoints
+                ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
+                for (int i = 0; i < endpoints.length; i++) {
+                    if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+                        onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+                                EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
+                    }
+                }
+
+                startConsumerMonitor();
+            } catch (JMSException e) {
+                JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+                throw jbiEx;
+            }
+        }
+    }
+
+    /**
+     * stop the flow
+     * 
+     * @throws JBIException
+     */
+    public void stop() throws JBIException {
+        if (started.compareAndSet(true, false)) {
+            log.debug(broker.getContainer().getName() + ": Stopping jms flow");
+            super.stop();
+            for (Iterator it = subscriberSet.iterator(); it.hasNext();) {
+                String id = (String) it.next();
+                removeAllPackets(id);
+            }
+            subscriberSet.clear();
+            try {
+                stopConsumerMonitor();
+                broadcastConsumer.close();
+            } catch (JMSException e) {
+                log.debug("JMSException caught in stop", e);
+            }
+        }
+    }
+
+    public void shutDown() throws JBIException {
+        super.shutDown();
+        stop();
+        // Remove endpoint listener
+        broker.getContainer().removeListener(endpointListener);
+        // Remove component listener
+        broker.getContainer().removeListener(componentListener);
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (JMSException e) {
+                log.warn("Error closing JMS Connection", e);
+            }
+        }
+    }
+
+    /**
+     * useful for testing
+     * 
+     * @return number of containers in the network
+     */
+    public int numberInNetwork() {
+        return subscriberSet.size();
+    }
+
+    public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+        if (!started.get()) {
+            return;
+        }
+        try {
+            String key = EndpointSupport.getKey(event.getEndpoint());
+            if (!consumerMap.containsKey(key)) {
+                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+                MessageConsumer consumer = inboundSession.createConsumer(queue);
+                consumer.setMessageListener(this);
+                consumerMap.put(key, consumer);
+            }
+            if (broadcast) {
+                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+                ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                topicProducer.send(msg);
+            }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for " + event.getEndpoint(), e);
+        }
+    }
+
+    public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
+        try {
+            String key = EndpointSupport.getKey(event.getEndpoint());
+            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+            if (consumer != null) {
+                consumer.close();
+            }
+            if (broadcast) {
+                ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+                topicProducer.send(msg);
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for " + event, e);
+        }
+    }
+
+    public void onComponentStarted(ComponentEvent event) {
+        if (!started.get()) {
+            return;
+        }
+        try {
+            String key = event.getComponent().getName();
+            if (!consumerMap.containsKey(key)) {
+                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+                MessageConsumer consumer = inboundSession.createConsumer(queue);
+                consumer.setMessageListener(this);
+                consumerMap.put(key, consumer);
+            }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
+        }
+    }
+
+    public void onComponentStopped(ComponentEvent event) {
+        try {
+            String key = event.getComponent().getName();
+            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+            if (consumer != null) {
+                consumer.close();
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
+        }
+    }
+
+    public void onRemoteEndpointRegistered(EndpointEvent event) {
+        log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
+        broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
+    }
+
+    public void onRemoteEndpointUnregistered(EndpointEvent event) {
+        log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
+        broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+    }
+
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param me
+     * @throws MessagingException
+     */
+    protected void doSend(MessageExchangeImpl me) throws MessagingException {
+        doRouting(me);
+    }
+
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param me
+     * @throws MessagingException
+     */
+    public void doRouting(MessageExchangeImpl me) throws MessagingException {
+        // let ActiveMQ do the routing ...
+        try {
+            String destination;
+            if (me.getRole() == Role.PROVIDER) {
+                if (me.getDestinationId() == null) {
+                    destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
+                } else {
+                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
+                }
+            } else {
+                if (me.getSourceId() == null) {
+                    throw new IllegalStateException("No sourceId set on the exchange");
+                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
+                    // If the consumer is stateless and has specified a sender
+                    // endpoint,
+                    // this exchange will be sent to the given endpoint queue,
+                    // so that
+                    // fail-over and load-balancing can be achieved
+                    // This property must have been created using
+                    // EndpointSupport.getKey
+                    if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
+                        destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
+                    } else {
+                        destination = INBOUND_PREFIX + me.getSourceId().getName();
+                    }
+                } else {
+                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
+                }
+            }
+
+            Queue queue = inboundSession.createQueue(destination);
+            ObjectMessage msg = inboundSession.createObjectMessage(me);
+            queueProducer.send(queue, msg);
+        } catch (JMSException e) {
+            log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+            throw new MessagingException(e);
+        }
+    }
+
+    /**
+     * MessageListener implementation
+     * 
+     * @param message
+     */
+    public void onMessage(final Message message) {
+        try {
+            if (message != null && started.get()) {
+                ObjectMessage objMsg = (ObjectMessage) message;
+                final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+                // Dispatch the message in another thread so as to free the jms
+                // session
+                // else if a component do a sendSync into the jms flow, the
+                // whole
+                // flow is deadlocked
+                executor.execute(new Runnable() {
+                    public void run() {
+                        try {
+                            if (me.getDestinationId() == null) {
+                                ServiceEndpoint se = me.getEndpoint();
+                                se = broker.getContainer().getRegistry().getInternalEndpoint(se.getServiceName(),
+                                        se.getEndpointName());
+                                me.setEndpoint(se);
+                                me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+                            }
+                            AbstractJMSFlow.super.doRouting(me);
+                        } catch (Throwable e) {
+                            log.error("Caught an exception routing ExchangePacket: ", e);
+                        }
+                    }
+                });
+            }
+        } catch (JMSException jmsEx) {
+            log.error("Caught an exception unpacking JMS Message: ", jmsEx);
+        }
+    }
+
+    /**
+     * A new cluster node is announced. Add this node to the subscriber set and
+     * send all our local internal endpoints to this node.
+     * 
+     * @param connectionId
+     */
+    protected void addClusterNode(String connectionId) {
+        subscriberSet.add(connectionId);
+        ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
+        for (int i = 0; i < endpoints.length; i++) {
+            if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+                onInternalEndpointRegistered(
+                        new EndpointEvent(endpoints[i], EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+            }
+        }
+    }
+
+    /**
+     * A cluster node leaves the cluster. Remove this node from the subscriber
+     * set and remove all packets waiting to be delivered to this node
+     * 
+     * @param connectionId
+     */
+    protected void removeClusterNode(String connectionId) {
+        subscriberSet.remove(connectionId);
+        removeAllPackets(connectionId);
+    }
+
+    protected void removeAllPackets(String containerName) {
+        // TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+    }
+
+    public String getJmsURL() {
+        return jmsURL;
+    }
+
+    public void setJmsURL(String jmsURL) {
+        this.jmsURL = jmsURL;
+    }
+
+}

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=516044&r1=516043&r2=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Thu Mar  8 05:59:26 2007
@@ -16,27 +16,12 @@
  */
 package org.apache.servicemix.jbi.nmr.flow.jms;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.jms.DeliveryMode;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
 import javax.jms.Topic;
 
-import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -44,527 +29,45 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveInfo;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.executors.Executor;
-import org.apache.servicemix.jbi.event.ComponentAdapter;
-import org.apache.servicemix.jbi.event.ComponentEvent;
-import org.apache.servicemix.jbi.event.ComponentListener;
-import org.apache.servicemix.jbi.event.EndpointAdapter;
-import org.apache.servicemix.jbi.event.EndpointEvent;
-import org.apache.servicemix.jbi.event.EndpointListener;
-import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
-import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
-import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
-
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Use for message routing among a network of containers. All routing/registration happens automatically.
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
  * 
  * @version $Revision$
  * @org.apache.xbean.XBean element="jmsFlow"
  */
-public class JMSFlow extends AbstractFlow implements MessageListener {
-
-    private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
-
-    private String jmsURL = "peer://org.apache.servicemix?persistent=false";
-
-    private String userName;
-
-    private String password;
-
-    private ActiveMQConnectionFactory connectionFactory;
-
-    private ActiveMQConnection connection;
-
-    private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
-
-    private MessageProducer queueProducer;
-
-    private MessageProducer topicProducer;
-
-    private Topic broadcastTopic;
-
-    private Session broadcastSession;
-
-    private MessageConsumer broadcastConsumer;
-
-    private Session inboundSession;
-
-    private MessageConsumer advisoryConsumer;
-
-    private Set subscriberSet = new CopyOnWriteArraySet();
-
-    private Map consumerMap = new ConcurrentHashMap();
+public class JMSFlow extends AbstractJMSFlow {
 
-    private AtomicBoolean started = new AtomicBoolean(false);
-
-    private EndpointListener endpointListener;
-    
-    private ComponentListener componentListener;
-    
-    private Executor executor;
-
-    /**
-     * The type of Flow
-     * 
-     * @return the type
-     */
-    public String getDescription() {
-        return "jms";
-    }
-
-    /**
-     * @return Returns the jmsURL.
-     */
-    public String getJmsURL() {
-        return jmsURL;
-    }
-
-    /**
-     * @param jmsURL The jmsURL to set.
-     */
-    public void setJmsURL(String jmsURL) {
-        this.jmsURL = jmsURL;
+    protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
+        return (jmsURL != null) ? new ActiveMQConnectionFactory(jmsURL) : new ActiveMQConnectionFactory();
     }
 
     /**
-     * @return Returns the password.
+     * Listener on the ActiveMQ advisory topic so we get messages when a cluster
+     * node is added or removed
      */
-    public String getPassword() {
-        return password;
-    }
-
-    /**
-     * @param password The password to set.
-     */
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    /**
-     * @return Returns the userName.
-     */
-    public String getUserName() {
-        return userName;
-    }
-
-    /**
-     * @param userName The userName to set.
-     */
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    /**
-     * @return Returns the connectionFactory.
-     */
-    public ActiveMQConnectionFactory getConnectionFactory() {
-        return connectionFactory;
-    }
-
-    /**
-     * @param connectionFactory The connectionFactory to set.
-     */
-    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
-    }
-
-    /**
-     * @return Returns the broadcastDestinationName.
-     */
-    public String getBroadcastDestinationName() {
-        return broadcastDestinationName;
-    }
-
-    /**
-     * @param broadcastDestinationName The broadcastDestinationName to set.
-     */
-    public void setBroadcastDestinationName(String broadcastDestinationName) {
-        this.broadcastDestinationName = broadcastDestinationName;
-    }
-
-    /**
-     * Check if the flow can support the requested QoS for this exchange
-     * @param me the exchange to check
-     * @return true if this flow can handle the given exchange
-     */
-    public boolean canHandle(MessageExchange me) {
-        if (isTransacted(me)) {
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * Initialize the Region
-     * 
-     * @param broker
-     * @throws JBIException
-     */
-    public void init(Broker broker) throws JBIException {
-        log.debug(broker.getContainer().getName() + ": Initializing jms flow");
-        super.init(broker);
-        // Find executor
-        executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
-        // Create and register endpoint listener
-        endpointListener = new EndpointAdapter() {
-            public void internalEndpointRegistered(EndpointEvent event) {
-                onInternalEndpointRegistered(event, true);
-            }
-
-            public void internalEndpointUnregistered(EndpointEvent event) {
-                onInternalEndpointUnregistered(event, true);
-            }
-        };
-        broker.getContainer().addListener(endpointListener);
-        // Create and register component listener
-        componentListener = new ComponentAdapter() {
-            public void componentStarted(ComponentEvent event) {
-                onComponentStarted(event);
-            }
-            public void componentStopped(ComponentEvent event) {
-                onComponentStopped(event);
-            }
-        };
-        broker.getContainer().addListener(componentListener);
-        try {
-            if (connectionFactory == null) {
-                if (jmsURL != null) {
-                    connectionFactory = new ActiveMQConnectionFactory(jmsURL);
-                } else {
-                    connectionFactory = new ActiveMQConnectionFactory();
-                }
-            }
-            if (userName != null) {
-                connection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
-            } else {
-                connection = (ActiveMQConnection) connectionFactory.createConnection();
-            }
-            connection.setClientID(broker.getContainer().getName());
-            connection.start();
-            inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
-            MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
-            inboundQueue.setMessageListener(this);
-            queueProducer = inboundSession.createProducer(null);
-            broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
-            topicProducer = broadcastSession.createProducer(broadcastTopic);
-            topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        } catch (JMSException e) {
-            log.error("Failed to initialize JMSFlow", e);
-            throw new JBIException(e);
-        }
-    }
-
-    /**
-     * start the flow
-     * 
-     * @throws JBIException
-     */
-    public void start() throws JBIException {
-        if (started.compareAndSet(false, true)) {
-            log.debug(broker.getContainer().getName() + ": Starting jms flow");
-            super.start();
-            try {
-                broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
-                broadcastConsumer.setMessageListener(new MessageListener() {
-                    public void onMessage(Message message) {
-                        try {
-                            Object obj = ((ObjectMessage) message).getObject();
-                            if (obj instanceof EndpointEvent) {
-                                EndpointEvent event = (EndpointEvent) obj;
-                                String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
-                                if (!getBroker().getContainer().getName().equals(container)) {
-                                    if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
-                                        onRemoteEndpointRegistered(event);
-                                    } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
-                                        onRemoteEndpointUnregistered(event);
-                                    }
-                                }
-                            }
-                        } catch (Exception e) {
-                            log.error("Error processing incoming broadcast message", e);
-                        }
-                    }
-                });
-                Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
-                advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
-                advisoryConsumer.setMessageListener(new MessageListener() {
-                    public void onMessage(Message message) {
-                        if (started.get()) {
-                            onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
-                        }
-                    }
-                });
-
-                // Start queue consumers for all components
-                for (Iterator it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) {
-                    ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
-                    if (cmp.isStarted()) {
-                        onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
-                    }
-                }
-                // Start queue consumers for all endpoints
-                ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
-                for (int i = 0; i < endpoints.length; i++) {
-                    if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
-                        onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
-                                EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
-                    }
-                }
-            } catch (JMSException e) {
-                JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
-                throw jbiEx;
-            }
-        }
-    }
-
-    /**
-     * stop the flow
-     * 
-     * @throws JBIException
-     */
-    public void stop() throws JBIException {
-        if (started.compareAndSet(true, false)) {
-            log.debug(broker.getContainer().getName() + ": Stopping jms flow");
-            super.stop();
-            for (Iterator it = subscriberSet.iterator(); it.hasNext();) {
-                String id = (String) it.next();
-                removeAllPackets(id);
-            }
-            subscriberSet.clear();
-            try {
-                advisoryConsumer.close();
-                broadcastConsumer.close();
-            } catch (JMSException e) {
-                log.debug("JMSException caught in stop", e);
-            }
-        }
-    }
-
-    public void shutDown() throws JBIException {
-        super.shutDown();
-        stop();
-        // Remove endpoint listener
-        broker.getContainer().removeListener(endpointListener);
-        // Remove component listener
-        broker.getContainer().removeListener(componentListener);
-        // Shutdown executor
-        executor.shutdown();
-        // Close connection
-        if (this.connection != null) {
-            try {
-                this.connection.close();
-            } catch (JMSException e) {
-                log.warn("Error closing JMS Connection", e);
-            }
-        }
-    }
-
-    /**
-     * useful for testing
-     * 
-     * @return number of containers in the network
-     */
-    public int numberInNetwork() {
-        return subscriberSet.size();
-    }
-
-    public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
-        if (!started.get()) {
-            return;
-        }
-        try {
-            String key = EndpointSupport.getKey(event.getEndpoint());
-            if (!consumerMap.containsKey(key)) {
-                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
-                MessageConsumer consumer = inboundSession.createConsumer(queue);
-                consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
-            }
-            if (broadcast) {
-                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
-                ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                topicProducer.send(msg);
-            }
-        } catch (Exception e) {
-            log.error("Cannot create consumer for " + event.getEndpoint(), e);
-        }
-    }
-
-    public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
-        try {
-            String key = EndpointSupport.getKey(event.getEndpoint());
-            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
-            if (consumer != null) {
-                consumer.close();
-            }
-            if (broadcast) {
-                ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
-                topicProducer.send(msg);
-            }
-        } catch (Exception e) {
-            log.error("Cannot destroy consumer for " + event, e);
-        }
-    }
-    
-    public void onComponentStarted(ComponentEvent event) {
-        if (!started.get()) {
+    protected void onConsumerMonitorMessage(Message advisoryMessage) {
+        if (!started.get())
             return;
-        }
-        try {
-            String key = event.getComponent().getName();
-            if (!consumerMap.containsKey(key)) {
-                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
-                MessageConsumer consumer = inboundSession.createConsumer(queue);
-                consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
-            }
-        } catch (Exception e) {
-            log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
-        }
-    }
-    
-    public void onComponentStopped(ComponentEvent event) {
-        try {
-            String key = event.getComponent().getName();
-            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
-            if (consumer != null) {
-                consumer.close();
-            }
-        } catch (Exception e) {
-            log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
-        }
-    }
-
-    public void onRemoteEndpointRegistered(EndpointEvent event) {
-        log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
-        broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
-    }
-
-    public void onRemoteEndpointUnregistered(EndpointEvent event) {
-        log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
-        broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
-    }
-
-    /**
-     * Distribute an ExchangePacket
-     * 
-     * @param me
-     * @throws MessagingException
-     */
-    protected void doSend(MessageExchangeImpl me) throws MessagingException {
-        doRouting(me);
-    }
-
-    /**
-     * Distribute an ExchangePacket
-     * 
-     * @param me
-     * @throws MessagingException
-     */
-    public void doRouting(MessageExchangeImpl me) throws MessagingException {
-        // let ActiveMQ do the routing ...
-        try {
-            String destination;
-            if (me.getRole() == Role.PROVIDER) {
-                if (me.getDestinationId() == null) {
-                    destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
-                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
-                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
-                } else {
-                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
-                }
-            } else {
-                if (me.getSourceId() == null) {
-                    throw new IllegalStateException("No sourceId set on the exchange");
-                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
-                    // If the consumer is stateless and has specified a sender endpoint,
-                    // this exchange will be sent to the given endpoint queue, so that
-                    // fail-over and load-balancing can be achieved
-                    // This property must have been created using EndpointSupport.getKey
-                    if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
-                        destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
-                    } else {
-                        destination = INBOUND_PREFIX + me.getSourceId().getName();
-                    }
-                } else {
-                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
-                }
-            }
-                
-            Queue queue = inboundSession.createQueue(destination);
-            ObjectMessage msg = inboundSession.createObjectMessage(me);
-            queueProducer.send(queue, msg);
-        } catch (JMSException e) {
-            log.error("Failed to send exchange: " + me + " internal JMS Network", e);
-            throw new MessagingException(e);
-        }
-    }
-
-    /**
-     * MessageListener implementation
-     * 
-     * @param message
-     */
-    public void onMessage(final Message message) {
-        try {
-            if (message != null && started.get()) {
-                ObjectMessage objMsg = (ObjectMessage) message;
-                final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
-                // Dispatch the message in another thread so as to free the jms session
-                // else if a component do a sendSync into the jms flow, the whole
-                // flow is deadlocked 
-                executor.execute(new Runnable() {
-                    public void run() {
-                        try {
-                            if (me.getDestinationId() == null) {
-                                ServiceEndpoint se = me.getEndpoint();
-                                se = broker.getContainer().getRegistry()
-                                        .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
-                                me.setEndpoint(se);
-                                me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
-                            }
-                            JMSFlow.super.doRouting(me);
-                        } catch (Throwable e) {
-                            log.error("Caught an exception routing ExchangePacket: ", e);
-                        }
-                    }
-                });
-            }
-        } catch (JMSException jmsEx) {
-            log.error("Caught an exception unpacking JMS Message: ", jmsEx);
-        }
-    }
-
-    protected void onAdvisoryMessage(Object obj) {
+        Object obj = ((ActiveMQMessage) advisoryMessage).getDataStructure();
         if (obj instanceof ConsumerInfo) {
             ConsumerInfo info = (ConsumerInfo) obj;
-            subscriberSet.add(info.getConsumerId().getConnectionId());
-            ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
-            for (int i = 0; i < endpoints.length; i++) {
-                if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
-                    onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
-                            EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
-                }
-            }
+            addClusterNode(info.getConsumerId().getConnectionId());
         } else if (obj instanceof RemoveInfo) {
-            ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
-            subscriberSet.remove(id.getConnectionId());
-            removeAllPackets(id.getConnectionId());
+            ConsumerId consumerId = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+            removeClusterNode(consumerId.getConnectionId());
         }
     }
 
-    private void removeAllPackets(String containerName) {
-        //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+    public void startConsumerMonitor() throws JMSException {
+        Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
+        monitorMessageConsumer = broadcastSession.createConsumer(advisoryTopic);
+        monitorMessageConsumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                onConsumerMonitorMessage(message);
+            }
+        });
     }
+
 }

Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=auto&rev=516044
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Thu Mar  8 05:59:26 2007
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow.jms;
+
+import java.lang.reflect.Constructor;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+
+/**
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
+ * 
+ * @version $Revision$
+ * @org.apache.xbean.XBean element="jmsFlowTibco"
+ */
+public class JMSFlowTibco extends AbstractJMSFlow {
+
+    private static final String TOPIC_NAME_MONITOR_CONSUMER = "$sys.monitor.consumer.*";
+
+    private static final String PROPERTY_NAME_EVENT_CLASS = "event_class";
+
+    private static final String PROPERTY_NAME_TARGET_DEST_NAME = "target_dest_name";
+
+    private static final String PROPERTY_NAME_CONN_CONNID = "conn_connid";
+
+    private static final String EVENT_CLASS_CONSUMER_CREATE = "consumer.create";
+
+    protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
+        try {
+            Class connFactoryClass = Class.forName("com.tibco.tibjms.TibjmsConnectionFactory");
+            if (jmsURL != null) {
+                Constructor cns = connFactoryClass.getConstructor(new Class[] { String.class });
+                ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] { jmsURL });
+                return connFactory;
+            } else {
+                ConnectionFactory connFactory = (ConnectionFactory) connFactoryClass.newInstance();
+                return connFactory;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to create Tibco connection factory", e);
+        }
+        /*
+        return (jmsURL != null) ? new com.tibco.tibjms.TibjmsConnectionFactory.TibjmsConnectionFactory(jmsURL) : 
+                                  new com.tibco.tibjms.TibjmsConnectionFactory.TibjmsConnectionFactory();
+        */
+    }
+
+    public void onConsumerMonitorMessage(Message message) {
+        if (!started.get())
+            return;
+        try {
+            String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
+            String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);
+            String eventClass = message.getStringProperty(PROPERTY_NAME_EVENT_CLASS);
+            if (broadcastTopic.getTopicName().equals(targetDestName)) {
+                if (EVENT_CLASS_CONSUMER_CREATE.equals(eventClass)) {
+                    addClusterNode(connectionId);
+                } else {
+                    removeClusterNode(connectionId);
+                }
+            }
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void startConsumerMonitor() throws JMSException {
+        Topic createTopic = broadcastSession.createTopic(TOPIC_NAME_MONITOR_CONSUMER);
+        monitorMessageConsumer = broadcastSession.createConsumer(createTopic);
+        monitorMessageConsumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                onConsumerMonitorMessage(message);
+            }
+        });
+    }
+
+}

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain