You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/10 16:38:14 UTC

svn commit: r564607 [6/12] - in /incubator/servicemix/trunk/core/servicemix-core/src: main/java/org/apache/servicemix/ main/java/org/apache/servicemix/jbi/ main/java/org/apache/servicemix/jbi/framework/ main/java/org/apache/servicemix/jbi/framework/sup...

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java Fri Aug 10 07:37:46 2007
@@ -16,41 +16,47 @@
  */
 package org.apache.servicemix.jbi.messaging;
 
-import javax.jbi.messaging.InOptionalOut;
-import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 
 /**
  * A plugin strategy which marshals an Object into and out of a JBI message.
- * This interface is used by the ServiceMixClient to marshal POJOs into and out of JBI messages.
- *
+ * This interface is used by the ServiceMixClient to marshal POJOs into and out
+ * of JBI messages.
+ * 
  * @version $Revision$
  */
 public interface PojoMarshaler {
 
     /**
-     * The key on the message to store the message body which cannot be marshaled into or out of XML easily
-     * or to provide a cache of the object representation of the object.
+     * The key on the message to store the message body which cannot be
+     * marshaled into or out of XML easily or to provide a cache of the object
+     * representation of the object.
      */
     String BODY = "org.apache.servicemix.body";
 
     /**
-     * Marshals the payload into the normalized message, typically as the content
-     * property.
-     *
-     * @param exchange the message exchange in which to marshal
-     * @param message the message in which to marshal
-     * @param body the body of the message as a POJO
+     * Marshals the payload into the normalized message, typically as the
+     * content property.
+     * 
+     * @param exchange
+     *            the message exchange in which to marshal
+     * @param message
+     *            the message in which to marshal
+     * @param body
+     *            the body of the message as a POJO
      */
     void marshal(MessageExchange exchange, NormalizedMessage message, Object body) throws MessagingException;
 
     /**
      * Unmarshals the response out of the normalized message.
-     *
-     * @param exchange the message exchange, which is an {@link InOut} or {@link InOptionalOut}
-     * @param message the output message
+     * 
+     * @param exchange
+     *            the message exchange, which is an {@link InOut} or
+     *            {@link InOptionalOut}
+     * @param message
+     *            the output message
      * @return the unmarshaled body object, extracted from the message
      * @throws MessagingException
      */

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java Fri Aug 10 07:37:46 2007
@@ -16,55 +16,56 @@
  */
 package org.apache.servicemix.jbi.messaging;
 
-import javax.jbi.messaging.RobustInOnly;
-
 import java.io.IOException;
 import java.io.ObjectInput;
 
+import javax.jbi.messaging.RobustInOnly;
+
 /**
  * RobustInOnly message exchange.
- *
+ * 
  * @version $Revision$
  */
 public class RobustInOnlyImpl extends MessageExchangeImpl implements RobustInOnly {
 
     private static final long serialVersionUID = -1606399168587959356L;
 
-    private static int[][] STATES_CONSUMER = {
-        { CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1},
-        { CAN_CONSUMER, -1, 2, 3, 3 },
-        { CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 4, 4 },
-        { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
-        { CAN_CONSUMER, -1, -1, -1, -1 },
+    private static final int[][] STATES_CONSUMER = {
+        {CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
+        {CAN_CONSUMER, -1, 2, 3, 3 },
+        {CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 4, 4 },
+        {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 }, 
+        {CAN_CONSUMER, -1, -1, -1, -1 }
     };
-    
-    private static int[][] STATES_PROVIDER = {
-        { CAN_PROVIDER, 1, -1, -1, -1 },
-        { CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 2, 4, 4 },
-        { CAN_PROVIDER, -1, -1, 3, 3 },
-        { CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
-        { CAN_PROVIDER, -1, -1, -1, -1 },
+
+    private static final int[][] STATES_PROVIDER = { 
+        {CAN_PROVIDER, 1, -1, -1, -1 },
+        {CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE
+                                                    + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 2, 4, 4 }, 
+        {CAN_PROVIDER, -1, -1, 3, 3 }, 
+        {CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
+        {CAN_PROVIDER, -1, -1, -1, -1 }
     };
-    
+
     public RobustInOnlyImpl() {
     }
-    
+
     public RobustInOnlyImpl(String exchangeId) {
         super(exchangeId, MessageExchangeSupport.ROBUST_IN_ONLY, STATES_CONSUMER);
         this.mirror = new RobustInOnlyImpl(this);
     }
-    
+
     public RobustInOnlyImpl(ExchangePacket packet) {
         super(packet, STATES_CONSUMER);
         this.mirror = new RobustInOnlyImpl(this);
     }
-    
+
     protected RobustInOnlyImpl(RobustInOnlyImpl mep) {
         super(mep.packet, STATES_PROVIDER);
         this.mirror = mep;
     }
 
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {        
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         this.packet = new ExchangePacket();
         this.packet.readExternal(in);
         if (this.packet.in != null) {

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java Fri Aug 10 07:37:46 2007
@@ -37,10 +37,10 @@
  */
 public class ComponentStats extends BaseLifeCycle implements ComponentStatsMBean {
 
-    private static final Log log = LogFactory.getLog(ComponentStats.class);
-    
     public static final String STATS_FILE = "stats.csv";
     
+    private static final Log LOG = LogFactory.getLog(ComponentStats.class);
+    
     private ComponentMBeanImpl component;
     private MessagingStats stats;
     private File statsFile;
@@ -80,9 +80,8 @@
                 long outbound = stats.getOutboundExchanges().getCount();
                 double outboundRate = stats.getOutboundExchangeRate().getAveragePerSecond();
                 statsWriter.println(inbound + "," + inboundRate + "," + outbound + "," + outboundRate);
-            }
-            catch (IOException e) {
-                log.warn("Failed to dump stats", e);
+            } catch (IOException e) {
+                LOG.warn("Failed to dump stats", e);
             }
         }
     }
@@ -115,7 +114,7 @@
         */
         stats.getInboundExchanges().increment();
         stats.getInboundExchangeRate().addTime();
-   }
+    }
     
     void incrementOutbound() {
         /*

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java Fri Aug 10 07:37:46 2007
@@ -28,36 +28,36 @@
      * 
      * @return inbound count
      */
-    public long getInboundExchangeCount();
+    long getInboundExchangeCount();
 
     /**
      * Get the Inbound MessageExchange rate (number/sec)
      * 
      * @return the inbound exchange rate
      */
-    public double getInboundExchangeRate();
+    double getInboundExchangeRate();
 
     /**
      * Get the Outbound MessageExchange count
      * 
      * @return outbound count
      */
-    public long getOutboundExchangeCount();
+    long getOutboundExchangeCount();
 
     /**
      * Get the Outbound MessageExchange rate (number/sec)
      * 
      * @return the outbound exchange rate
      */
-    public double getOutboundExchangeRate();
+    double getOutboundExchangeRate();
     
     /**
      * @return size of the inbound Queue
      */
-    public int getInboundQueueSize();
+    int getInboundQueueSize();
 
     /**
      * reset all stats counters
      */
-    public void reset();
+    void reset();
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java Fri Aug 10 07:37:46 2007
@@ -23,31 +23,31 @@
      * 
      * @return inbound count
      */
-    public long getInboundExchangeCount();
+    long getInboundExchangeCount();
 
     /**
      * Get the Inbound MessageExchange rate (number/sec)
      * 
      * @return the inbound exchange rate
      */
-    public double getInboundExchangeRate();
+    double getInboundExchangeRate();
 
     /**
      * Get the Outbound MessageExchange count
      * 
      * @return outbound count
      */
-    public long getOutboundExchangeCount();
+    long getOutboundExchangeCount();
 
     /**
      * Get the Outbound MessageExchange rate (number/sec)
      * 
      * @return the outbound exchange rate
      */
-    public double getOutboundExchangeRate();
+    double getOutboundExchangeRate();
     
     /**
      * reset all stats counters
      */
-    public void reset();
+    void reset();
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java Fri Aug 10 07:37:46 2007
@@ -28,11 +28,12 @@
  * @version $Revision$
  */
 public class MessagingStats extends StatsImpl {
-    private String name;
+
     protected CountStatisticImpl inboundExchanges;
     protected CountStatisticImpl outboundExchanges;
     protected TimeStatisticImpl inboundExchangeRate;
     protected TimeStatisticImpl outboundExchangeRate;
+    private String name;
 
     /**
      * Default Constructor

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java Fri Aug 10 07:37:46 2007
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.ExchangeStatus;
@@ -50,8 +51,6 @@
 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
 import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * 
  * @author gnodet
@@ -59,7 +58,7 @@
  */
 public class StatisticsService extends BaseSystemService implements StatisticsServiceMBean {
 
-    private static final Log log = LogFactory.getLog(StatisticsService.class);
+    private static final Log LOG = LogFactory.getLog(StatisticsService.class);
     
     private ConcurrentHashMap<String, ComponentStats> componentStats = new ConcurrentHashMap<String, ComponentStats>();
     private ConcurrentHashMap<String, EndpointStats> endpointStats = new ConcurrentHashMap<String, EndpointStats>();
@@ -101,9 +100,8 @@
             if (timerTask != null) {
                 timerTask.cancel();
             }
-        }
-        else if (!dumpStats && value) {
-            dumpStats = value;//scheduleStatsTimer relies on dumpStats value
+        } else if (!dumpStats && value) {
+            dumpStats = value; //scheduleStatsTimer relies on dumpStats value
             scheduleStatsTimer();
         }
         dumpStats = value;
@@ -199,15 +197,15 @@
         ComponentStats stats = new ComponentStats(component);
         componentStats.putIfAbsent(key, stats);
         // Register MBean
-        ManagementContext context= container.getManagementContext();
+        ManagementContext context = container.getManagementContext();
         try {
             context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)), 
                     stats, 
                     ComponentStatsMBean.class);
         } catch (Exception e) {
-            log.info("Unable to register component statistics MBean: " + e.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to register component statistics MBean", e);
+            LOG.info("Unable to register component statistics MBean: " + e.getMessage());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unable to register component statistics MBean", e);
             }
         }
     }
@@ -220,13 +218,13 @@
             return;
         }
         // Register MBean
-        ManagementContext context= container.getManagementContext();
+        ManagementContext context = container.getManagementContext();
         try {
             context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
         } catch (Exception e) {
-            log.info("Unable to unregister component statistics MBean: " + e);
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to unregister component statistics MBean", e);
+            LOG.info("Unable to unregister component statistics MBean: " + e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unable to unregister component statistics MBean", e);
             }
         }
     }
@@ -238,15 +236,15 @@
         EndpointStats stats = new EndpointStats(endpoint, compStats.getMessagingStats());
         endpointStats.putIfAbsent(key, stats);
         // Register MBean
-        ManagementContext context= container.getManagementContext();
+        ManagementContext context = container.getManagementContext();
         try {
             context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)), 
                                   stats, 
                                   EndpointStatsMBean.class);
         } catch (Exception e) {
-            log.info("Unable to register endpoint statistics MBean: " + e.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to register endpoint statistics MBean", e);
+            LOG.info("Unable to register endpoint statistics MBean: " + e.getMessage());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unable to register endpoint statistics MBean", e);
             }
         }
     }
@@ -256,13 +254,13 @@
         String key = EndpointSupport.getUniqueKey(endpoint);
         EndpointStats stats = endpointStats.remove(key);
         // Register MBean
-        ManagementContext context= container.getManagementContext();
+        ManagementContext context = container.getManagementContext();
         try {
             context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
         } catch (Exception e) {
-            log.info("Unable to unregister endpoint statistics MBean: " + e.getMessage());
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to unregister endpoint statistics MBean", e);
+            LOG.info("Unable to unregister endpoint statistics MBean: " + e.getMessage());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unable to unregister endpoint statistics MBean", e);
             }
         }
     }
@@ -270,12 +268,11 @@
     protected void onExchangeSent(ExchangeEvent event) {
         MessageExchange me = event.getExchange();
         // This is a new exchange sent by a consumer
-        if (me.getStatus() == ExchangeStatus.ACTIVE &&
-            me.getRole() == Role.CONSUMER && 
-            me.getMessage("out") == null && 
-            me.getFault() == null &&
-            me instanceof MessageExchangeImpl) 
-        {
+        if (me.getStatus() == ExchangeStatus.ACTIVE
+                && me.getRole() == Role.CONSUMER 
+                && me.getMessage("out") == null 
+                && me.getFault() == null
+                && me instanceof MessageExchangeImpl) {
             MessageExchangeImpl mei = (MessageExchangeImpl) me;
             String source = (String) me.getProperty(JbiConstants.SENDER_ENDPOINT);
             if (source == null) {
@@ -283,7 +280,8 @@
                 ComponentStats stats = componentStats.get(source);
                 stats.incrementOutbound();
             } else {
-                ServiceEndpoint[] ses = getContainer().getRegistry().getEndpointRegistry().getAllEndpointsForComponent(mei.getSourceId());
+                ServiceEndpoint[] ses = getContainer().getRegistry().getEndpointRegistry()
+                                                .getAllEndpointsForComponent(mei.getSourceId());
                 for (int i = 0; i < ses.length; i++) {
                     if (EndpointSupport.getKey(ses[i]).equals(source)) {
                         source = EndpointSupport.getUniqueKey(ses[i]);
@@ -292,19 +290,18 @@
                         break;
                     }
                 }
-            }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
+            }
         }
     }
     
     protected void onExchangeAccepted(ExchangeEvent event) {
         MessageExchange me = event.getExchange();
         // This is a new exchange sent by a consumer
-        if (me.getStatus() == ExchangeStatus.ACTIVE &&
-            me.getRole() == Role.PROVIDER && 
-            me.getMessage("out") == null && 
-            me.getFault() == null &&
-            me instanceof MessageExchangeImpl) 
-        {
+        if (me.getStatus() == ExchangeStatus.ACTIVE
+                && me.getRole() == Role.PROVIDER 
+                && me.getMessage("out") == null 
+                && me.getFault() == null
+                && me instanceof MessageExchangeImpl) {
             String source = EndpointSupport.getUniqueKey(me.getEndpoint());
             EndpointStats stats = endpointStats.get(source);
             stats.incrementInbound();

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java Fri Aug 10 07:37:46 2007
@@ -21,25 +21,25 @@
     /**
      * @return the statsInterval
      */
-    public long getStatsInterval();
+    long getStatsInterval();
 
     /**
      * @param statsInterval the statsInterval to set
      */
-    public void setStatsInterval(long statsInterval);
+    void setStatsInterval(long statsInterval);
 
     /**
      * @return the dumpStats
      */
-    public boolean isDumpStats();
+    boolean isDumpStats();
 
     /**
      * @param dumpStats the dumpStats to set
      */
-    public void setDumpStats(boolean value);
+    void setDumpStats(boolean value);
     
     /**
      * Reset all statistics
      */
-    public void resetAllStats();
+    void resetAllStats();
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java Fri Aug 10 07:37:46 2007
@@ -16,10 +16,10 @@
  */
 package org.apache.servicemix.jbi.monitoring.stats;
 
-import javax.management.j2ee.statistics.CountStatistic;
-
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.management.j2ee.statistics.CountStatistic;
+
 /**
  * A count statistic implementation
  *
@@ -107,10 +107,11 @@
      */
     public double getPeriod() {
         double count = counter.get();
-        if( count == 0 )
+        if (count == 0) {
             return 0;
-        double time = (System.currentTimeMillis() - getStartTime());
-        return (time/(count*1000.0));
+        }
+        double time = System.currentTimeMillis() - getStartTime();
+        return time / (count * 1000.0);
     }
     
     /**
@@ -118,8 +119,8 @@
      */
     public double getFrequency() {
         double count = counter.get();
-        double time = (System.currentTimeMillis() - getStartTime());
-        return (count*1000.0/time);
+        double time = System.currentTimeMillis() - getStartTime();
+        return count * 1000.0 / time;
     }
 
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java Fri Aug 10 07:37:46 2007
@@ -26,5 +26,5 @@
     /**
      * Reset the statistic
      */
-    public void reset();
+    void reset();
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java Fri Aug 10 07:37:46 2007
@@ -43,7 +43,7 @@
 
     public void reset() {
         Statistic[] stats = getStatistics();
-        for (int i = 0, size = stats.length; i < size; i++) {
+        for (int i = 0; i < stats.length; i++) {
             Statistic stat = stats[i];
             if (stat instanceof Resettable) {
                 Resettable r = (Resettable) stat;

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java Fri Aug 10 07:37:46 2007
@@ -22,13 +22,13 @@
 import org.apache.servicemix.jbi.container.JBIContainer;
 
 /**
- * The Broker handles Nomalised Message Routing within ServiceMix
+ * The Broker handles Normalized Message Routing within ServiceMix
  * 
  * @version $Revision$
  */
 public interface Broker extends BrokerMBean {
 
-    public JBIContainer getContainer();
+    JBIContainer getContainer();
     
     /**
      * initialize the broker
@@ -36,17 +36,17 @@
      * @param container
      * @throws JBIException
      */
-    public void init(JBIContainer container) throws JBIException;
+    void init(JBIContainer container) throws JBIException;
     
     /**
      * suspend the flow to prevent any message exchanges
      */
-    public void suspend();
+    void suspend();
 
     /**
      * resume message exchange processing
      */
-    public void resume();
+    void resume();
 
     /**
      * Route an ExchangePacket to a destination
@@ -54,6 +54,6 @@
      * @param exchange
      * @throws JBIException
      */
-    public void sendExchangePacket(MessageExchange exchange) throws JBIException;
+    void sendExchangePacket(MessageExchange exchange) throws JBIException;
     
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java Fri Aug 10 07:37:46 2007
@@ -22,8 +22,8 @@
 import javax.jbi.JBIException;
 import javax.jbi.component.Component;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.management.JMException;
 import javax.management.MBeanOperationInfo;
@@ -63,11 +63,12 @@
  */
 public class DefaultBroker extends BaseSystemService implements Broker {
 
+    private static final Log LOG = LogFactory.getLog(DefaultBroker.class);
+
     private Registry registry;
     private String flowNames = "seda";
-    private String subscriptionFlowName = null;
+    private String subscriptionFlowName;
     private Flow[] flows;
-    private final static Log log = LogFactory.getLog(DefaultBroker.class);
     private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
     private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
     private SubscriptionManager subscriptionManager = new SubscriptionManager();
@@ -121,18 +122,18 @@
                 flows[i].init(this);
             }
         }
-    	subscriptionManager.init(this, registry);
+        subscriptionManager.init(this, registry);
     }
-    
+
     protected Class<BrokerMBean> getServiceMBean() {
         return BrokerMBean.class;
     }
 
     /**
-	 * Get the name of the Container
-	 * 
-	 * @return containerName
-	 */
+     * Get the name of the Container
+     * 
+     * @return containerName
+     */
     public String getContainerName() {
         return container.getName();
     }
@@ -213,18 +214,19 @@
      * @return the subscriptionFlowName
      */
     public String getSubscriptionFlowName() {
-		return subscriptionFlowName;
-	}
+        return subscriptionFlowName;
+    }
 
     /**
      * Set the subscription flow name
+     * 
      * @param subscriptionFlowName
      */
-	public void setSubscriptionFlowName(String subscriptionFlowName) {
-		this.subscriptionFlowName = subscriptionFlowName;
-	}
+    public void setSubscriptionFlowName(String subscriptionFlowName) {
+        this.subscriptionFlowName = subscriptionFlowName;
+    }
 
-	/**
+    /**
      * Set the flow
      * 
      * @param flow
@@ -282,9 +284,9 @@
         }
 
         if (exchange.getRole() == Role.PROVIDER) {
-        	getSubscriptionManager().dispatchToSubscribers(exchange);
+            getSubscriptionManager().dispatchToSubscribers(exchange);
         }
-        
+
         if (!foundRoute) {
             boolean throwException = true;
             ActivationSpec activationSpec = exchange.getActivationSpec();
@@ -292,8 +294,8 @@
                 throwException = activationSpec.isFailIfNoDestinationEndpoint();
             }
             if (throwException) {
-                throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService() + " and interface: "
-                        + exchange.getInterfaceName());
+                throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService()
+                                + " and interface: " + exchange.getInterfaceName());
             } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
                 exchange.handleAccept();
                 ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
@@ -303,15 +305,16 @@
             }
         }
     }
-    
+
     protected void resolveAddress(MessageExchangeImpl exchange) throws JBIException {
         ServiceEndpoint theEndpoint = exchange.getEndpoint();
         if (theEndpoint != null) {
             if (theEndpoint instanceof ExternalEndpoint) {
                 throw new JBIException("External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
             }
-            if (theEndpoint instanceof AbstractServiceEndpoint == false) {
-                throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
+            if (!(theEndpoint instanceof AbstractServiceEndpoint)) {
+                throw new JBIException(
+                                "Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
             }
         }
         // Resolve linked endpoints
@@ -330,7 +333,7 @@
         if (theEndpoint == null) {
             QName serviceName = exchange.getService();
             QName interfaceName = exchange.getInterfaceName();
-            
+
             // check in order, ServiceName then InterfaceName
             // check to see if there is a match on the serviceName
             if (serviceName != null) {
@@ -338,7 +341,7 @@
                 endpoints = getMatchingEndpoints(endpoints, exchange);
                 theEndpoint = getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
                 if (theEndpoint == null) {
-                    log.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
+                    LOG.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
                 }
             }
             if (theEndpoint == null && interfaceName != null) {
@@ -346,7 +349,7 @@
                 endpoints = getMatchingEndpoints(endpoints, exchange);
                 theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
                 if (theEndpoint == null) {
-                    log.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
+                    LOG.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
                 }
             }
             if (theEndpoint == null) {
@@ -359,8 +362,7 @@
                         try {
                             EndpointFilter filter = createEndpointFilter(context, exchange);
                             theEndpoint = (InternalEndpoint) destinationResolver.resolveEndpoint(context, exchange, filter);
-                        }
-                        catch (JBIException e) {
+                        } catch (JBIException e) {
                             throw new MessagingException("Failed to resolve endpoint: " + e, e);
                         }
                     }
@@ -370,40 +372,42 @@
         if (theEndpoint != null) {
             exchange.setEndpoint(theEndpoint);
         }
-        if (log.isTraceEnabled()) {
-            log.trace("Routing exchange " + exchange + " to: " + theEndpoint);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Routing exchange " + exchange + " to: " + theEndpoint);
         }
     }
 
     /**
-     * Filter the given endpoints by asking to the provider and consumer
-     * if they are both ok to process the exchange.
+     * Filter the given endpoints by asking to the provider and consumer if they
+     * are both ok to process the exchange.
      * 
-     * @param endpoints an array of internal endpoints to check
-     * @param exchange the exchange that will be serviced
+     * @param endpoints
+     *            an array of internal endpoints to check
+     * @param exchange
+     *            the exchange that will be serviced
      * @return an array of endpoints on which both consumer and provider agrees
      */
     protected ServiceEndpoint[] getMatchingEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
-    	List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
+        List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
         ComponentMBeanImpl consumer = getRegistry().getComponent(exchange.getSourceId());
-        
-    	for (int i = 0; i < endpoints.length; i++) {
-			ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
+
+        for (int i = 0; i < endpoints.length; i++) {
+            ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
             if (id != null) {
                 ComponentMBeanImpl provider = getRegistry().getComponent(id);
                 if (provider != null) {
-                    if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) ||
-                        !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
-           			    continue;
-                	}
+                    if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange)
+                                    || !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
+                        continue;
+                    }
                 }
             }
             filtered.add(endpoints[i]);
-		}
-		return filtered.toArray(new ServiceEndpoint[filtered.size()]);
-	}
+        }
+        return filtered.toArray(new ServiceEndpoint[filtered.size()]);
+    }
 
-	/**
+    /**
      * @return the default EndpointChooser
      */
     public EndpointChooser getDefaultInterfaceChooser() {
@@ -443,7 +447,8 @@
     }
 
     /**
-     * @param defaultFlowChooser the defaultFlowChooser to set
+     * @param defaultFlowChooser
+     *            the defaultFlowChooser to set
      */
     public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
         this.defaultFlowChooser = defaultFlowChooser;
@@ -501,8 +506,7 @@
         Component component = context.getComponent();
         if (exchange.getRole() == Role.PROVIDER) {
             return new ConsumerComponentEndpointFilter(component);
-        }
-        else {
+        } else {
             return new ProducerComponentEndpointFilter(component);
         }
     }
@@ -521,8 +525,8 @@
         return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
     }
 
-	public JBIContainer getContainer() {
-		return container;
-	}
+    public JBIContainer getContainer() {
+        return container;
+    }
 
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java Fri Aug 10 07:37:46 2007
@@ -16,14 +16,9 @@
  */
 package org.apache.servicemix.jbi.nmr;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.framework.Registry;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.DeliveryChannel;
@@ -32,9 +27,14 @@
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.jbi.framework.Registry;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
 
 /**
  * Handles publish/subscribe style messaging in the NMR.
@@ -43,94 +43,99 @@
  * @version $Revision$
  */
 public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener {
-    
+
     public static final String COMPONENT_NAME = "#SubscriptionManager#";
-    
+
+    private static final Log LOG = LogFactory.getLog(SubscriptionManager.class);
+
+    // SM-229: Avoid StackOverflowException
+    private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
+
     private Registry registry;
+
     private String flowName;
-    private static Log log = LogFactory.getLog(SubscriptionManager.class);
-    
-    //  SM-229: Avoid StackOverflowException
-    private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
-    
+
     /**
      * Initialize the SubscriptionManager
+     * 
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, Registry registry) throws JBIException {
-        this.registry = registry; 
+    public void init(Broker broker, Registry reg) throws JBIException {
+        this.registry = reg;
         broker.getContainer().activateComponent(this, COMPONENT_NAME);
     }
 
     /**
      * Dispatches the given exchange to all matching subscribers
-     * @param exchange 
+     * 
+     * @param exchange
      * @return true if dispatched to a matching subscriber(s)
      * 
-     * @throws JBIException 
+     * @throws JBIException
      */
     protected boolean dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException {
-    	Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
-    	if (source == null || !source.booleanValue()) {
-	        List<InternalEndpoint> list = registry.getMatchingSubscriptionEndpoints(exchange);
-	        if (list != null) {
-	            for (int i = 0; i < list.size(); i++) {
-	                InternalEndpoint endpoint = list.get(i);
-	                dispatchToSubscriber(exchange, endpoint);
-	            }
-	        }
-	        return list != null && !list.isEmpty();
-    	} else {
-    		return false;
-    	}
+        Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
+        if (source == null || !source.booleanValue()) {
+            List<InternalEndpoint> list = registry.getMatchingSubscriptionEndpoints(exchange);
+            if (list != null) {
+                for (int i = 0; i < list.size(); i++) {
+                    InternalEndpoint endpoint = list.get(i);
+                    dispatchToSubscriber(exchange, endpoint);
+                }
+            }
+            return list != null && !list.isEmpty();
+        } else {
+            return false;
+        }
     }
 
     /**
      * Dispatches the given message exchange to the given endpoint
-     * @param exchange 
-     * @param endpoint 
-     * @throws JBIException 
+     * 
+     * @param exchange
+     * @param endpoint
+     * @throws JBIException
      */
     protected void dispatchToSubscriber(MessageExchangeImpl exchange, InternalEndpoint endpoint) throws JBIException {
-    	if (log.isDebugEnabled() && endpoint != null) {
-    		log.debug("Subscription Endpoint: "+endpoint.getEndpointName());
+        if (LOG.isDebugEnabled() && endpoint != null) {
+            LOG.debug("Subscription Endpoint: " + endpoint.getEndpointName());
         }
-    	// SM-229: Avoid StackOverflowException
-    	Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
-    	if (source == null || !source.booleanValue()) {
-	        DeliveryChannel channel = getDeliveryChannel();
-	        InOnly me = channel.createExchangeFactory().createInOnlyExchange();
-	        // SM-229: Avoid StackOverflowException 
-	        me.setProperty(FROM_SUBSCRIPTION_MANAGER,Boolean.TRUE);
-	        NormalizedMessage in = me.createMessage();
-	        getMessageTransformer().transform(me, exchange.getInMessage(), in);
-	        me.setInMessage(in);
-	        me.setEndpoint(endpoint);
-	        Set names = exchange.getPropertyNames();
-	        for (Iterator iter = names.iterator(); iter.hasNext();) {
-	            String name = (String) iter.next();
-	            me.setProperty(name, exchange.getProperty(name));
-	        }
-	        if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
-	            channel.sendSync(me);
-	        } else {
-	            channel.send(me);
-	        }
-    	}
-    }
-
-	public String getFlowName() {
-		return flowName;
-	}
-
-	public void setFlowName(String flowName) {
-		this.flowName = flowName;
-	}
+        // SM-229: Avoid StackOverflowException
+        Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
+        if (source == null || !source.booleanValue()) {
+            DeliveryChannel channel = getDeliveryChannel();
+            InOnly me = channel.createExchangeFactory().createInOnlyExchange();
+            // SM-229: Avoid StackOverflowException
+            me.setProperty(FROM_SUBSCRIPTION_MANAGER, Boolean.TRUE);
+            NormalizedMessage in = me.createMessage();
+            getMessageTransformer().transform(me, exchange.getInMessage(), in);
+            me.setInMessage(in);
+            me.setEndpoint(endpoint);
+            Set names = exchange.getPropertyNames();
+            for (Iterator iter = names.iterator(); iter.hasNext();) {
+                String name = (String) iter.next();
+                me.setProperty(name, exchange.getProperty(name));
+            }
+            if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
+                channel.sendSync(me);
+            } else {
+                channel.send(me);
+            }
+        }
+    }
+
+    public String getFlowName() {
+        return flowName;
+    }
+
+    public void setFlowName(String flowName) {
+        this.flowName = flowName;
+    }
 
     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
         // We should only receive done exchanges from subscribers
         // but we need that so that they can be dequeued
     }
-   
+
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Fri Aug 10 07:37:46 2007
@@ -22,8 +22,8 @@
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
@@ -48,110 +48,106 @@
  * @version $Revision$
  */
 public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
-    
+
     protected final Log log = LogFactory.getLog(getClass());
-    
     protected Broker broker;
     protected ExecutorFactory executorFactory;
     private ReadWriteLock lock = new ReentrantReadWriteLock();
-    private Thread suspendThread = null;
+    private Thread suspendThread;
     private String name;
 
     /**
      * Initialize the Region
      * 
-     * @param broker
+     * @param br
      * @throws JBIException
      */
-    public void init(Broker broker) throws JBIException {
-        this.broker = broker;
-        this.executorFactory = broker.getContainer().getExecutorFactory();
+    public void init(Broker br) throws JBIException {
+        this.broker = br;
+        this.executorFactory = br.getContainer().getExecutorFactory();
         // register self with the management context
-        ObjectName objectName = broker.getContainer().getManagementContext().createObjectName(this);
+        ObjectName objectName = br.getContainer().getManagementContext().createObjectName(this);
         try {
-            broker.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
-        }
-        catch (JMException e) {
+            br.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
+        } catch (JMException e) {
             throw new JBIException("Failed to register MBean with the ManagementContext", e);
         }
     }
-    
+
     /**
      * start the flow
      * @throws JBIException
      */
-    public void start() throws JBIException{
+    public void start() throws JBIException {
         super.start();
     }
-    
-    
+
     /**
      * stop the flow
      * @throws JBIException
      */
-    public void stop() throws JBIException{
+    public void stop() throws JBIException {
         if (log.isDebugEnabled()) {
             log.debug("Called Flow stop");
         }
-        if (suspendThread != null){
+        if (suspendThread != null) {
             suspendThread.interrupt();
         }
         super.stop();
     }
-    
+
     /**
      * shutDown the flow
      * @throws JBIException
      */
-    public void shutDown() throws JBIException{
+    public void shutDown() throws JBIException {
         if (log.isDebugEnabled()) {
             log.debug("Called Flow shutdown");
         }
         broker.getContainer().getManagementContext().unregisterMBean(this);
         super.shutDown();
     }
-    
+
     /**
      * Distribute an ExchangePacket
      * @param packet
      * @throws JBIException
      */
-    public void send(MessageExchange me) throws JBIException{
+    public void send(MessageExchange me) throws JBIException {
         if (log.isDebugEnabled()) {
             log.debug("Called Flow send");
         }
-    	// do send
+        // do send
         try {
             lock.readLock().lock();
             doSend((MessageExchangeImpl) me);
-        } finally{
+        } finally {
             lock.readLock().unlock();
         }
     }
-    
+
     /**
      * suspend the flow to prevent any message exchanges
      */
-    public synchronized void suspend(){
+    public synchronized void suspend() {
         if (log.isDebugEnabled()) {
             log.debug("Called Flow suspend");
         }
         lock.writeLock().lock();
         suspendThread = Thread.currentThread();
     }
-    
-    
+
     /**
      * resume message exchange processing
      */
-    public synchronized void resume(){
+    public synchronized void resume() {
         if (log.isDebugEnabled()) {
             log.debug("Called Flow resume");
         }
         lock.writeLock().unlock();
         suspendThread = null;
     }
-    
+
     /**
      * Do the Flow specific routing
      * @param packet
@@ -175,12 +171,11 @@
             } else {
                 throw new MessagingException("Component " + id.getName() + " is shut down");
             }
-        }
-        else {
+        } else {
             throw new MessagingException("No component named " + id.getName() + " - Couldn't route MessageExchange " + me);
         }
     }
-    
+
     /**
      * Get an array of MBeanAttributeInfo
      * 
@@ -200,17 +195,17 @@
      */
     protected boolean isPersistent(MessageExchange me) {
         ExchangePacket packet = ((MessageExchangeImpl) me).getPacket();
-    	if (packet.getPersistent() != null) {
-    		return packet.getPersistent().booleanValue();
-    	} else {
-    		return broker.getContainer().isPersistent();
-    	}
+        if (packet.getPersistent() != null) {
+            return packet.getPersistent().booleanValue();
+        } else {
+            return broker.getContainer().isPersistent();
+        }
     }
 
     protected boolean isTransacted(MessageExchange me) {
         return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
     }
-    
+
     protected boolean isSynchronous(MessageExchange me) {
         Boolean sync = (Boolean) me.getProperty(JbiConstants.SEND_SYNC);
         return sync != null && sync.booleanValue();
@@ -222,7 +217,7 @@
             ServiceEndpoint se = me.getEndpoint();
             if (se instanceof InternalEndpoint) {
                 return ((InternalEndpoint) se).isClustered();
-            // Unknown: assume this is not clustered
+                // Unknown: assume this is not clustered
             } else {
                 return false;
             }
@@ -232,7 +227,7 @@
             return !source.equals(destination);
         }
     }
-    
+
     public Broker getBroker() {
         return broker;
     }
@@ -244,23 +239,23 @@
     public String getType() {
         return "Flow";
     }
-    
+
     /**
      * Get the name of the item
      * @return the name
      */
     public String getName() {
         if (this.name == null) {
-            String name = super.getName();
-            if (name.endsWith("Flow")) {
-                name = name.substring(0, name.length() - 4);
+            String n = super.getName();
+            if (n.endsWith("Flow")) {
+                n = n.substring(0, n.length() - 4);
             }
-            return name;
+            return n;
         } else {
             return this.name;
         }
     }
-    
+
     public void setName(String name) {
         this.name = name;
     }
@@ -271,5 +266,5 @@
     public ExecutorFactory getExecutorFactory() {
         return executorFactory;
     }
-    
+
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java Fri Aug 10 07:37:46 2007
@@ -40,7 +40,8 @@
             }
             if (foundFlow == null) {
                 throw new MessagingException("Flow '" + flow + "' was specified but not found");
-            } if (foundFlow.canHandle(exchange)) {
+            }
+            if (foundFlow.canHandle(exchange)) {
                 return foundFlow;
             } else {
                 throw new MessagingException("Flow '" + flow + "' was specified but not able to handle exchange");
@@ -55,5 +56,5 @@
         }
         return null;
     }
-    
+
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java Fri Aug 10 07:37:46 2007
@@ -16,12 +16,12 @@
  */
 package org.apache.servicemix.jbi.nmr.flow;
 
-import org.apache.servicemix.jbi.nmr.Broker;
-
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
 import javax.jbi.messaging.MessageExchange;
 
+import org.apache.servicemix.jbi.nmr.Broker;
+
 /**
  * A Flow provides different dispatch policies within the NMR
  *
@@ -34,49 +34,49 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker) throws JBIException;
+    void init(Broker broker) throws JBIException;
     
     /**
      * The description of Flow
      * @return the description
      */
-    public String getDescription();
+    String getDescription();
     
     /**
      * The unique name of Flow
      * @return the name
      */
-    public String getName();
+    String getName();
     
     /**
      * Distribute an ExchangePacket
      * @param packet
      * @throws JBIException
      */
-    public void send(MessageExchange me) throws JBIException;
+    void send(MessageExchange me) throws JBIException;
     
     /**
      * suspend the flow to prevent any message exchanges
      */
-    public void suspend();
+    void suspend();
     
     
     /**
      * resume message exchange processing
      */
-    public void resume();
+    void resume();
     
     /**
      * Get the broker associated with this flow
      *
      */
-    public Broker getBroker();
+    Broker getBroker();
     
     /**
      * Check if the flow can support the requested QoS for this exchange
      * @param me the exchange to check
      * @return true if this flow can handle the given exchange
      */
-    public boolean canHandle(MessageExchange me);
+    boolean canHandle(MessageExchange me);
         
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java Fri Aug 10 07:37:46 2007
@@ -19,22 +19,28 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Map;
+
 import javax.jbi.JBIException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.finder.FactoryFinder;
 import org.apache.servicemix.jbi.util.IntrospectionSupport;
 import org.apache.servicemix.jbi.util.URISupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * Find a Flow by Name
  * 
  * @version $Revision$
  */
-public class FlowProvider{
-    private static final Log log=LogFactory.getLog(FlowProvider.class);
-    private static FactoryFinder finder=new FactoryFinder("META-INF/services/org/apache/servicemix/jbi/nmr/flow/");
+public final class FlowProvider {
+
+    private static final Log LOG = LogFactory.getLog(FlowProvider.class);
+
+    private static final FactoryFinder FINDER = new FactoryFinder("META-INF/services/org/apache/servicemix/jbi/nmr/flow/");
+
+    private FlowProvider() {
+    }
 
     /**
      * Locate a Flow
@@ -43,54 +49,54 @@
      * @return the Flow
      * @throws JBIException
      */
-    public static Flow getFlow(String flow) throws JBIException{
+    public static Flow getFlow(String flow) throws JBIException {
         Object value;
-        String flowName=getFlowName(flow);
-        try{
-            value=finder.newInstance(flowName);
-            if(value!=null&&value instanceof Flow){
-                String query=getQuery(flow);
-                if(query!=null){
-                    Map map=URISupport.parseQuery(query);
-                    if(map!=null&&!map.isEmpty()){
-                        IntrospectionSupport.setProperties(value,map);
+        String flowName = getFlowName(flow);
+        try {
+            value = FINDER.newInstance(flowName);
+            if (value != null && value instanceof Flow) {
+                String query = getQuery(flow);
+                if (query != null) {
+                    Map map = URISupport.parseQuery(query);
+                    if (map != null && !map.isEmpty()) {
+                        IntrospectionSupport.setProperties(value, map);
                     }
                 }
                 return (Flow) value;
             }
-            throw new JBIException("No implementation found for: "+flow);
-        }catch(IllegalAccessException e){
-            log.error("getFlow("+flow+" failed: "+e,e);
+            throw new JBIException("No implementation found for: " + flow);
+        } catch (IllegalAccessException e) {
+            LOG.error("getFlow(" + flow + " failed: " + e, e);
             throw new JBIException(e);
-        }catch(InstantiationException e){
-            log.error("getFlow("+flow+" failed: "+e,e);
+        } catch (InstantiationException e) {
+            LOG.error("getFlow(" + flow + " failed: " + e, e);
             throw new JBIException(e);
-        }catch(IOException e){
-            log.error("getFlow("+flow+" failed: "+e,e);
+        } catch (IOException e) {
+            LOG.error("getFlow(" + flow + " failed: " + e, e);
             throw new JBIException(e);
-        }catch(ClassNotFoundException e){
-            log.error("getFlow("+flow+" failed: "+e,e);
+        } catch (ClassNotFoundException e) {
+            LOG.error("getFlow(" + flow + " failed: " + e, e);
             throw new JBIException(e);
-        }catch(URISyntaxException e){
-            log.error("getFlow("+flow+" failed: "+e,e);
+        } catch (URISyntaxException e) {
+            LOG.error("getFlow(" + flow + " failed: " + e, e);
             throw new JBIException(e);
         }
     }
 
-    public static String getFlowName(String str){
-        String result=str;
-        int index=str.indexOf('?');
-        if(index>=0){
-            result=str.substring(0,index);
+    public static String getFlowName(String str) {
+        String result = str;
+        int index = str.indexOf('?');
+        if (index >= 0) {
+            result = str.substring(0, index);
         }
         return result;
     }
 
-    protected static String getQuery(String str){
-        String result=null;
-        int index=str.indexOf('?');
-        if(index>=0&&(index+1)<str.length()){
-            result=str.substring(index+1);
+    protected static String getQuery(String str) {
+        String result = null;
+        int index = str.indexOf('?');
+        if (index >= 0 && (index + 1) < str.length()) {
+            result = str.substring(index + 1);
         }
         return result;
     }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Fri Aug 10 07:37:46 2007
@@ -26,8 +26,8 @@
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -81,14 +81,16 @@
 import org.jencks.factory.ConnectionManagerFactoryBean;
 
 /**
- * Use for message routing among a network of containers. All routing/registration happens automatically.
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
  * 
  * @version $Revision$
  * @org.apache.xbean.XBean element="jcaFlow"
  */
 public class JCAFlow extends AbstractFlow implements MessageListener {
-    
+
     private static final String INBOUND_PREFIX = "org.apache.servicemix.jca.";
+
     private String jmsURL = "tcp://localhost:61616";
     private ActiveMQConnectionFactory connectionFactory;
     private ConnectionFactory managedConnectionFactory;
@@ -107,11 +109,11 @@
 
     public JCAFlow() {
     }
-    
+
     public JCAFlow(String jmsURL) {
-    	this.jmsURL = jmsURL;
+        this.jmsURL = jmsURL;
     }
-    
+
     /**
      * The type of Flow
      * 
@@ -133,7 +135,8 @@
     /**
      * Sets the JMS URL for this flow
      * 
-     * @param jmsURL The jmsURL to set.
+     * @param jmsURL
+     *            The jmsURL to set.
      */
     public void setJmsURL(String jmsURL) {
         this.jmsURL = jmsURL;
@@ -151,7 +154,8 @@
     /**
      * Sets the ConnectionFactory for this flow
      * 
-     * @param connectionFactory The connectionFactory to set.
+     * @param connectionFactory
+     *            The connectionFactory to set.
      */
     public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
@@ -169,16 +173,17 @@
     /**
      * Sets the Broadcast Destination Name for this flow
      * 
-     * @param broadcastDestinationName The broadcastDestinationName to set.
+     * @param broadcastDestinationName
+     *            The broadcastDestinationName to set.
      */
     public void setBroadcastDestinationName(String broadcastDestinationName) {
         this.broadcastDestinationName = broadcastDestinationName;
     }
 
     public TransactionManager getTransactionManager() {
-    	return (TransactionManager) broker.getContainer().getTransactionManager();
+        return (TransactionManager) broker.getContainer().getTransactionManager();
     }
-    
+
     /**
      * Initialize the Region
      * 
@@ -204,6 +209,7 @@
             public void componentStarted(ComponentEvent event) {
                 onComponentStarted(event);
             }
+
             public void componentStopped(ComponentEvent event) {
                 onComponentStopped(event);
             }
@@ -213,13 +219,13 @@
             if (connectionFactory == null) {
                 connectionFactory = new ActiveMQConnectionFactory(jmsURL);
             }
-            
-        	// Inbound connector
+
+            // Inbound connector
             ActiveMQDestination dest = new ActiveMQQueue(INBOUND_PREFIX + broker.getContainer().getName());
             containerConnector = new Connector(dest, this, true);
             containerConnector.start();
-        	
-        	// Outbound connector
+
+            // Outbound connector
             ActiveMQResourceAdapter outboundRa = new ActiveMQResourceAdapter();
             outboundRa.setConnectionFactory(connectionFactory);
             ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
@@ -229,8 +235,7 @@
             // Inbound broadcast
             broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
             advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("Failed to initialize JCAFlow", e);
             throw new JBIException(e);
         }
@@ -266,9 +271,9 @@
                         }
                     }
                 };
-                broadcastConnector = new Connector(broadcastTopic, listener, false); 
+                broadcastConnector = new Connector(broadcastTopic, listener, false);
                 broadcastConnector.start();
-                
+
                 listener = new MessageListener() {
                     public void onMessage(Message message) {
                         if (started.get()) {
@@ -278,8 +283,7 @@
                 };
                 advisoryConnector = new Connector(advisoryTopic, listener, false);
                 advisoryConnector.start();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 throw new JBIException("JMSException caught in start: " + e.getMessage(), e);
             }
         }
@@ -315,17 +319,17 @@
         broker.getContainer().removeListener(componentListener);
         // Destroy connectors
         while (!connectorMap.isEmpty()) {
-        	Connector connector = connectorMap.remove(connectorMap.keySet().iterator().next());
-        	try {
-        		connector.stop();
-        	} catch (Exception e) {
-        		log.debug("Error closing jca connector", e);
-        	}
+            Connector connector = connectorMap.remove(connectorMap.keySet().iterator().next());
+            try {
+                connector.stop();
+            } catch (Exception e) {
+                log.debug("Error closing jca connector", e);
+            }
         }
         try {
-        	containerConnector.stop();
-    	} catch (Exception e) {
-    		log.debug("Error closing jca connector", e);
+            containerConnector.stop();
+        } catch (Exception e) {
+            log.debug("Error closing jca connector", e);
         }
     }
 
@@ -340,7 +344,9 @@
 
     /**
      * Check if the flow can support the requested QoS for this exchange
-     * @param me the exchange to check
+     * 
+     * @param me
+     *            the exchange to check
      * @return true if this flow can handle the given exchange
      */
     public boolean canHandle(MessageExchange me) {
@@ -349,14 +355,14 @@
         }
         return true;
     }
-    
+
     public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
         if (!started.get()) {
             return;
         }
         try {
             String key = EndpointSupport.getKey(event.getEndpoint());
-            if (!connectorMap.containsKey(key)){
+            if (!connectorMap.containsKey(key)) {
                 ActiveMQDestination dest = new ActiveMQQueue(INBOUND_PREFIX + key);
                 Connector connector = new Connector(dest, this, true);
                 connector.start();
@@ -371,9 +377,9 @@
             log.error("Cannot create consumer for " + event.getEndpoint(), e);
         }
     }
-    
+
     public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
-        try{
+        try {
             String key = EndpointSupport.getKey(event.getEndpoint());
             Connector connector = connectorMap.remove(key);
             if (connector != null) {
@@ -388,7 +394,7 @@
             log.error("Cannot destroy consumer for " + event, e);
         }
     }
-    
+
     public void onComponentStarted(ComponentEvent event) {
         if (!started.get()) {
             return;
@@ -405,12 +411,12 @@
             log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
         }
     }
-    
+
     public void onComponentStopped(ComponentEvent event) {
         try {
             String key = event.getComponent().getName();
             Connector connector = connectorMap.remove(key);
-            if (connector != null){
+            if (connector != null) {
                 connector.stop();
             }
         } catch (Exception e) {
@@ -437,7 +443,7 @@
     protected void doSend(MessageExchangeImpl me) throws MessagingException {
         doRouting(me);
     }
-    
+
     /**
      * Distribute an ExchangePacket
      * 
@@ -460,9 +466,12 @@
                 if (me.getSourceId() == null) {
                     throw new IllegalStateException("No sourceId set on the exchange");
                 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
-                    // If the consumer is stateless and has specified a sender endpoint,
-                    // this exchange will be sent to the given endpoint queue, so that
-                    // This property must have been created using EndpointSupport.getKey
+                    // If the consumer is stateless and has specified a sender
+                    // endpoint,
+                    // this exchange will be sent to the given endpoint queue,
+                    // so that
+                    // This property must have been created using
+                    // EndpointSupport.getKey
                     // fail-over and load-balancing can be achieved
                     if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
                         destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
@@ -496,33 +505,32 @@
             if (message != null && started.get()) {
                 ObjectMessage objMsg = (ObjectMessage) message;
                 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
-                // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
-                // so that there are side effect (the exchange state may have been modified)
+                // Hack for redelivery: AMQ is too optimized and the object is
+                // the same upon redelivery
+                // so that there are side effect (the exchange state may have
+                // been modified)
                 // See http://jira.activemq.org/jira/browse/AMQ-519
-                //me = (MessageExchangeImpl) ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
+                // me = (MessageExchangeImpl) ((ActiveMQObjectMessage)
+                // ((ActiveMQObjectMessage) message).copy()).getObject();
                 TransactionManager tm = (TransactionManager) getTransactionManager();
                 if (tm != null) {
                     me.setTransactionContext(tm.getTransaction());
                 }
                 if (me.getDestinationId() == null) {
                     ServiceEndpoint se = me.getEndpoint();
-                    se = broker.getContainer().getRegistry()
-                            .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+                    se = broker.getContainer().getRegistry().getInternalEndpoint(se.getServiceName(), se.getEndpointName());
                     me.setEndpoint(se);
                     me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
                 }
                 super.doRouting(me);
             }
-        }
-        catch (JMSException jmsEx) {
+        } catch (JMSException jmsEx) {
             log.error("Caught an exception unpacking JMS Message: ", jmsEx);
-        }
-        catch (MessagingException e) {
+        } catch (MessagingException e) {
             log.error("Caught an exception routing ExchangePacket: ", e);
-        } 
-        catch (SystemException e) {
+        } catch (SystemException e) {
             log.error("Caught an exception acessing transaction context: ", e);
-		}
+        }
     }
 
     protected void onAdvisoryMessage(Object obj) {
@@ -532,8 +540,7 @@
             ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
             for (int i = 0; i < endpoints.length; i++) {
                 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
-                    onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
-                            EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+                    onInternalEndpointRegistered(new EndpointEvent(endpoints[i], EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
                 }
             }
         } else if (obj instanceof RemoveInfo) {
@@ -544,52 +551,57 @@
     }
 
     private void removeAllPackets(String containerName) {
-        //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+        // TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
     }
 
-	public ConnectionManager getConnectionManager() throws Exception {
-		if (connectionManager == null) {
-        	ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean();
-        	cmfb.setTransactionManager((TransactionManager) broker.getContainer().getTransactionManager());
-        	cmfb.setTransaction("xa");
-        	cmfb.afterPropertiesSet();
-			connectionManager = (ConnectionManager) cmfb.getObject();
-		}
-		return connectionManager;
-	}
-
-	public void setConnectionManager(ConnectionManager connectionManager) {
-		this.connectionManager = connectionManager;
-	}
+    public ConnectionManager getConnectionManager() throws Exception {
+        if (connectionManager == null) {
+            ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean();
+            cmfb.setTransactionManager((TransactionManager) broker.getContainer().getTransactionManager());
+            cmfb.setTransaction("xa");
+            cmfb.afterPropertiesSet();
+            connectionManager = (ConnectionManager) cmfb.getObject();
+        }
+        return connectionManager;
+    }
 
-    public String toString(){
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.connectionManager = connectionManager;
+    }
+
+    public String toString() {
         return broker.getContainer().getName() + " JCAFlow";
     }
-    
-    private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException, SystemException {
+
+    private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException,
+                    SystemException {
         if (transacted) {
             TransactionManager tm = (TransactionManager) getBroker().getContainer().getTransactionManager();
             if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
                 return;
             }
         }
-    	Connection connection = managedConnectionFactory.createConnection();
-    	try {
-    		Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-    		ObjectMessage msg = session.createObjectMessage(object);
-    		MessageProducer producer = session.createProducer(dest);
-    		producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-    		producer.send(msg);
-    	} finally {
-    		connection.close();
-    	}
+        Connection connection = managedConnectionFactory.createConnection();
+        try {
+            Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+            ObjectMessage msg = session.createObjectMessage(object);
+            MessageProducer producer = session.createProducer(dest);
+            producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            producer.send(msg);
+        } finally {
+            connection.close();
+        }
     }
-    
+
     class Connector {
         private ActiveMQResourceAdapter ra;
+
         private MessageEndpointFactory endpointFactory;
+
         private ActiveMQActivationSpec spec;
+
         private Executor executor;
+
         public Connector(ActiveMQDestination destination, MessageListener listener, boolean transacted) {
             ra = new ActiveMQResourceAdapter();
             ra.setConnectionFactory(connectionFactory);
@@ -599,6 +611,7 @@
             spec = new ActiveMQActivationSpec();
             spec.setActiveMQDestination(destination);
         }
+
         public void start() throws ResourceException {
             ExecutorFactory factory = broker.getContainer().getExecutorFactory();
             executor = factory.createExecutor("flow.jca." + spec.getDestination());
@@ -607,28 +620,33 @@
             spec.setResourceAdapter(ra);
             ra.endpointActivation(endpointFactory, spec);
         }
+
         public void stop() {
             ra.endpointDeactivation(endpointFactory, spec);
             ra.stop();
             executor.shutdown();
         }
     }
-    
+
     class SimpleBootstrapContext implements BootstrapContext {
         private final WorkManager workManager;
+
         public SimpleBootstrapContext(WorkManager workManager) {
             this.workManager = workManager;
         }
+
         public Timer createTimer() throws UnavailableException {
             throw new UnsupportedOperationException();
         }
+
         public WorkManager getWorkManager() {
             return workManager;
         }
+
         public XATerminator getXATerminator() {
             throw new UnsupportedOperationException();
         }
-        
+
     }
 
 }

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Fri Aug 10 07:37:46 2007
@@ -24,8 +24,8 @@
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -64,32 +64,20 @@
 
     private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
 
-    private String userName;
-
-    private String password;
-
-    ConnectionFactory connectionFactory;
-
+    protected ConnectionFactory connectionFactory;
     protected Connection connection;
+    protected AtomicBoolean started = new AtomicBoolean(false);
+    protected MessageConsumer monitorMessageConsumer;
+    protected Set<String> subscriberSet = new CopyOnWriteArraySet<String>();
 
+    private String userName;
+    private String password;
     private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
-
     private MessageConsumer broadcastConsumer;
-
-    protected Set<String> subscriberSet = new CopyOnWriteArraySet<String>();
-
     private Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
-
-    AtomicBoolean started = new AtomicBoolean(false);
-
     private EndpointListener endpointListener;
-
     private ComponentListener componentListener;
-
     private Executor executor;
-
-    protected MessageConsumer monitorMessageConsumer = null;
-
     private String jmsURL = "peer://org.apache.servicemix?persistent=false";
 
     /**
@@ -229,7 +217,7 @@
         }
     }
 
-    abstract protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL);
+    protected abstract ConnectionFactory createConnectionFactoryFromUrl(String url);
 
     /*
      * The following abstract methods have to be implemented by specialized JMS
@@ -238,7 +226,7 @@
 
     protected abstract void onConsumerMonitorMessage(Message message);
 
-    abstract public void startConsumerMonitor() throws JMSException;
+    public abstract void startConsumerMonitor() throws JMSException;
 
     public void stopConsumerMonitor() throws JMSException {
         monitorMessageConsumer.close();

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Fri Aug 10 07:37:46 2007
@@ -49,8 +49,9 @@
      * node is added or removed
      */
     protected void onConsumerMonitorMessage(Message advisoryMessage) {
-        if (!started.get())
+        if (!started.get()) {
             return;
+        }
         Object obj = ((ActiveMQMessage) advisoryMessage).getDataStructure();
         if (obj instanceof ConsumerInfo) {
             ConsumerInfo info = (ConsumerInfo) obj;

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Fri Aug 10 07:37:46 2007
@@ -48,8 +48,8 @@
         try {
             Class connFactoryClass = Class.forName("com.tibco.tibjms.TibjmsConnectionFactory");
             if (jmsURL != null) {
-                Constructor cns = connFactoryClass.getConstructor(new Class[] { String.class });
-                ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] { jmsURL });
+                Constructor cns = connFactoryClass.getConstructor(new Class[] {String.class });
+                ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] {jmsURL });
                 return connFactory;
             } else {
                 ConnectionFactory connFactory = (ConnectionFactory) connFactoryClass.newInstance();
@@ -65,8 +65,9 @@
     }
 
     public void onConsumerMonitorMessage(Message message) {
-        if (!started.get())
+        if (!started.get()) {
             return;
+        }
         try {
             String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
             String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);