You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/10 16:38:14 UTC
svn commit: r564607 [6/12] - in
/incubator/servicemix/trunk/core/servicemix-core/src:
main/java/org/apache/servicemix/ main/java/org/apache/servicemix/jbi/
main/java/org/apache/servicemix/jbi/framework/
main/java/org/apache/servicemix/jbi/framework/sup...
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/PojoMarshaler.java Fri Aug 10 07:37:46 2007
@@ -16,41 +16,47 @@
*/
package org.apache.servicemix.jbi.messaging;
-import javax.jbi.messaging.InOptionalOut;
-import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
/**
* A plugin strategy which marshals an Object into and out of a JBI message.
- * This interface is used by the ServiceMixClient to marshal POJOs into and out of JBI messages.
- *
+ * This interface is used by the ServiceMixClient to marshal POJOs into and out
+ * of JBI messages.
+ *
* @version $Revision$
*/
public interface PojoMarshaler {
/**
- * The key on the message to store the message body which cannot be marshaled into or out of XML easily
- * or to provide a cache of the object representation of the object.
+ * The key on the message to store the message body which cannot be
+ * marshaled into or out of XML easily or to provide a cache of the object
+ * representation of the object.
*/
String BODY = "org.apache.servicemix.body";
/**
- * Marshals the payload into the normalized message, typically as the content
- * property.
- *
- * @param exchange the message exchange in which to marshal
- * @param message the message in which to marshal
- * @param body the body of the message as a POJO
+ * Marshals the payload into the normalized message, typically as the
+ * content property.
+ *
+ * @param exchange
+ * the message exchange in which to marshal
+ * @param message
+ * the message in which to marshal
+ * @param body
+ * the body of the message as a POJO
*/
void marshal(MessageExchange exchange, NormalizedMessage message, Object body) throws MessagingException;
/**
* Unmarshals the response out of the normalized message.
- *
- * @param exchange the message exchange, which is an {@link InOut} or {@link InOptionalOut}
- * @param message the output message
+ *
+ * @param exchange
+ * the message exchange, which is an {@link InOut} or
+ * {@link InOptionalOut}
+ * @param message
+ * the output message
* @return the unmarshaled body object, extracted from the message
* @throws MessagingException
*/
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/RobustInOnlyImpl.java Fri Aug 10 07:37:46 2007
@@ -16,55 +16,56 @@
*/
package org.apache.servicemix.jbi.messaging;
-import javax.jbi.messaging.RobustInOnly;
-
import java.io.IOException;
import java.io.ObjectInput;
+import javax.jbi.messaging.RobustInOnly;
+
/**
* RobustInOnly message exchange.
- *
+ *
* @version $Revision$
*/
public class RobustInOnlyImpl extends MessageExchangeImpl implements RobustInOnly {
private static final long serialVersionUID = -1606399168587959356L;
- private static int[][] STATES_CONSUMER = {
- { CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1},
- { CAN_CONSUMER, -1, 2, 3, 3 },
- { CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 4, 4 },
- { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
- { CAN_CONSUMER, -1, -1, -1, -1 },
+ private static final int[][] STATES_CONSUMER = {
+ {CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
+ {CAN_CONSUMER, -1, 2, 3, 3 },
+ {CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 4, 4 },
+ {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
+ {CAN_CONSUMER, -1, -1, -1, -1 }
};
-
- private static int[][] STATES_PROVIDER = {
- { CAN_PROVIDER, 1, -1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 2, 4, 4 },
- { CAN_PROVIDER, -1, -1, 3, 3 },
- { CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
- { CAN_PROVIDER, -1, -1, -1, -1 },
+
+ private static final int[][] STATES_PROVIDER = {
+ {CAN_PROVIDER, 1, -1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE
+ + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 2, 4, 4 },
+ {CAN_PROVIDER, -1, -1, 3, 3 },
+ {CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
+ {CAN_PROVIDER, -1, -1, -1, -1 }
};
-
+
public RobustInOnlyImpl() {
}
-
+
public RobustInOnlyImpl(String exchangeId) {
super(exchangeId, MessageExchangeSupport.ROBUST_IN_ONLY, STATES_CONSUMER);
this.mirror = new RobustInOnlyImpl(this);
}
-
+
public RobustInOnlyImpl(ExchangePacket packet) {
super(packet, STATES_CONSUMER);
this.mirror = new RobustInOnlyImpl(this);
}
-
+
protected RobustInOnlyImpl(RobustInOnlyImpl mep) {
super(mep.packet, STATES_PROVIDER);
this.mirror = mep;
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.packet = new ExchangePacket();
this.packet.readExternal(in);
if (this.packet.in != null) {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStats.java Fri Aug 10 07:37:46 2007
@@ -37,10 +37,10 @@
*/
public class ComponentStats extends BaseLifeCycle implements ComponentStatsMBean {
- private static final Log log = LogFactory.getLog(ComponentStats.class);
-
public static final String STATS_FILE = "stats.csv";
+ private static final Log LOG = LogFactory.getLog(ComponentStats.class);
+
private ComponentMBeanImpl component;
private MessagingStats stats;
private File statsFile;
@@ -80,9 +80,8 @@
long outbound = stats.getOutboundExchanges().getCount();
double outboundRate = stats.getOutboundExchangeRate().getAveragePerSecond();
statsWriter.println(inbound + "," + inboundRate + "," + outbound + "," + outboundRate);
- }
- catch (IOException e) {
- log.warn("Failed to dump stats", e);
+ } catch (IOException e) {
+ LOG.warn("Failed to dump stats", e);
}
}
}
@@ -115,7 +114,7 @@
*/
stats.getInboundExchanges().increment();
stats.getInboundExchangeRate().addTime();
- }
+ }
void incrementOutbound() {
/*
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/ComponentStatsMBean.java Fri Aug 10 07:37:46 2007
@@ -28,36 +28,36 @@
*
* @return inbound count
*/
- public long getInboundExchangeCount();
+ long getInboundExchangeCount();
/**
* Get the Inbound MessageExchange rate (number/sec)
*
* @return the inbound exchange rate
*/
- public double getInboundExchangeRate();
+ double getInboundExchangeRate();
/**
* Get the Outbound MessageExchange count
*
* @return outbound count
*/
- public long getOutboundExchangeCount();
+ long getOutboundExchangeCount();
/**
* Get the Outbound MessageExchange rate (number/sec)
*
* @return the outbound exchange rate
*/
- public double getOutboundExchangeRate();
+ double getOutboundExchangeRate();
/**
* @return size of the inbound Queue
*/
- public int getInboundQueueSize();
+ int getInboundQueueSize();
/**
* reset all stats counters
*/
- public void reset();
+ void reset();
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/EndpointStatsMBean.java Fri Aug 10 07:37:46 2007
@@ -23,31 +23,31 @@
*
* @return inbound count
*/
- public long getInboundExchangeCount();
+ long getInboundExchangeCount();
/**
* Get the Inbound MessageExchange rate (number/sec)
*
* @return the inbound exchange rate
*/
- public double getInboundExchangeRate();
+ double getInboundExchangeRate();
/**
* Get the Outbound MessageExchange count
*
* @return outbound count
*/
- public long getOutboundExchangeCount();
+ long getOutboundExchangeCount();
/**
* Get the Outbound MessageExchange rate (number/sec)
*
* @return the outbound exchange rate
*/
- public double getOutboundExchangeRate();
+ double getOutboundExchangeRate();
/**
* reset all stats counters
*/
- public void reset();
+ void reset();
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/MessagingStats.java Fri Aug 10 07:37:46 2007
@@ -28,11 +28,12 @@
* @version $Revision$
*/
public class MessagingStats extends StatsImpl {
- private String name;
+
protected CountStatisticImpl inboundExchanges;
protected CountStatisticImpl outboundExchanges;
protected TimeStatisticImpl inboundExchangeRate;
protected TimeStatisticImpl outboundExchangeRate;
+ private String name;
/**
* Default Constructor
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsService.java Fri Aug 10 07:37:46 2007
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
@@ -50,8 +51,6 @@
import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
*
* @author gnodet
@@ -59,7 +58,7 @@
*/
public class StatisticsService extends BaseSystemService implements StatisticsServiceMBean {
- private static final Log log = LogFactory.getLog(StatisticsService.class);
+ private static final Log LOG = LogFactory.getLog(StatisticsService.class);
private ConcurrentHashMap<String, ComponentStats> componentStats = new ConcurrentHashMap<String, ComponentStats>();
private ConcurrentHashMap<String, EndpointStats> endpointStats = new ConcurrentHashMap<String, EndpointStats>();
@@ -101,9 +100,8 @@
if (timerTask != null) {
timerTask.cancel();
}
- }
- else if (!dumpStats && value) {
- dumpStats = value;//scheduleStatsTimer relies on dumpStats value
+ } else if (!dumpStats && value) {
+ dumpStats = value; //scheduleStatsTimer relies on dumpStats value
scheduleStatsTimer();
}
dumpStats = value;
@@ -199,15 +197,15 @@
ComponentStats stats = new ComponentStats(component);
componentStats.putIfAbsent(key, stats);
// Register MBean
- ManagementContext context= container.getManagementContext();
+ ManagementContext context = container.getManagementContext();
try {
context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)),
stats,
ComponentStatsMBean.class);
} catch (Exception e) {
- log.info("Unable to register component statistics MBean: " + e.getMessage());
- if (log.isDebugEnabled()) {
- log.debug("Unable to register component statistics MBean", e);
+ LOG.info("Unable to register component statistics MBean: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to register component statistics MBean", e);
}
}
}
@@ -220,13 +218,13 @@
return;
}
// Register MBean
- ManagementContext context= container.getManagementContext();
+ ManagementContext context = container.getManagementContext();
try {
context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
} catch (Exception e) {
- log.info("Unable to unregister component statistics MBean: " + e);
- if (log.isDebugEnabled()) {
- log.debug("Unable to unregister component statistics MBean", e);
+ LOG.info("Unable to unregister component statistics MBean: " + e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to unregister component statistics MBean", e);
}
}
}
@@ -238,15 +236,15 @@
EndpointStats stats = new EndpointStats(endpoint, compStats.getMessagingStats());
endpointStats.putIfAbsent(key, stats);
// Register MBean
- ManagementContext context= container.getManagementContext();
+ ManagementContext context = container.getManagementContext();
try {
context.registerMBean(context.createObjectName(context.createObjectNameProps(stats, true)),
stats,
EndpointStatsMBean.class);
} catch (Exception e) {
- log.info("Unable to register endpoint statistics MBean: " + e.getMessage());
- if (log.isDebugEnabled()) {
- log.debug("Unable to register endpoint statistics MBean", e);
+ LOG.info("Unable to register endpoint statistics MBean: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to register endpoint statistics MBean", e);
}
}
}
@@ -256,13 +254,13 @@
String key = EndpointSupport.getUniqueKey(endpoint);
EndpointStats stats = endpointStats.remove(key);
// Register MBean
- ManagementContext context= container.getManagementContext();
+ ManagementContext context = container.getManagementContext();
try {
context.unregisterMBean(context.createObjectName(context.createObjectNameProps(stats, true)));
} catch (Exception e) {
- log.info("Unable to unregister endpoint statistics MBean: " + e.getMessage());
- if (log.isDebugEnabled()) {
- log.debug("Unable to unregister endpoint statistics MBean", e);
+ LOG.info("Unable to unregister endpoint statistics MBean: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to unregister endpoint statistics MBean", e);
}
}
}
@@ -270,12 +268,11 @@
protected void onExchangeSent(ExchangeEvent event) {
MessageExchange me = event.getExchange();
// This is a new exchange sent by a consumer
- if (me.getStatus() == ExchangeStatus.ACTIVE &&
- me.getRole() == Role.CONSUMER &&
- me.getMessage("out") == null &&
- me.getFault() == null &&
- me instanceof MessageExchangeImpl)
- {
+ if (me.getStatus() == ExchangeStatus.ACTIVE
+ && me.getRole() == Role.CONSUMER
+ && me.getMessage("out") == null
+ && me.getFault() == null
+ && me instanceof MessageExchangeImpl) {
MessageExchangeImpl mei = (MessageExchangeImpl) me;
String source = (String) me.getProperty(JbiConstants.SENDER_ENDPOINT);
if (source == null) {
@@ -283,7 +280,8 @@
ComponentStats stats = componentStats.get(source);
stats.incrementOutbound();
} else {
- ServiceEndpoint[] ses = getContainer().getRegistry().getEndpointRegistry().getAllEndpointsForComponent(mei.getSourceId());
+ ServiceEndpoint[] ses = getContainer().getRegistry().getEndpointRegistry()
+ .getAllEndpointsForComponent(mei.getSourceId());
for (int i = 0; i < ses.length; i++) {
if (EndpointSupport.getKey(ses[i]).equals(source)) {
source = EndpointSupport.getUniqueKey(ses[i]);
@@ -292,19 +290,18 @@
break;
}
}
- }
+ }
}
}
protected void onExchangeAccepted(ExchangeEvent event) {
MessageExchange me = event.getExchange();
// This is a new exchange sent by a consumer
- if (me.getStatus() == ExchangeStatus.ACTIVE &&
- me.getRole() == Role.PROVIDER &&
- me.getMessage("out") == null &&
- me.getFault() == null &&
- me instanceof MessageExchangeImpl)
- {
+ if (me.getStatus() == ExchangeStatus.ACTIVE
+ && me.getRole() == Role.PROVIDER
+ && me.getMessage("out") == null
+ && me.getFault() == null
+ && me instanceof MessageExchangeImpl) {
String source = EndpointSupport.getUniqueKey(me.getEndpoint());
EndpointStats stats = endpointStats.get(source);
stats.incrementInbound();
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceMBean.java Fri Aug 10 07:37:46 2007
@@ -21,25 +21,25 @@
/**
* @return the statsInterval
*/
- public long getStatsInterval();
+ long getStatsInterval();
/**
* @param statsInterval the statsInterval to set
*/
- public void setStatsInterval(long statsInterval);
+ void setStatsInterval(long statsInterval);
/**
* @return the dumpStats
*/
- public boolean isDumpStats();
+ boolean isDumpStats();
/**
* @param dumpStats the dumpStats to set
*/
- public void setDumpStats(boolean value);
+ void setDumpStats(boolean value);
/**
* Reset all statistics
*/
- public void resetAllStats();
+ void resetAllStats();
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/CountStatisticImpl.java Fri Aug 10 07:37:46 2007
@@ -16,10 +16,10 @@
*/
package org.apache.servicemix.jbi.monitoring.stats;
-import javax.management.j2ee.statistics.CountStatistic;
-
import java.util.concurrent.atomic.AtomicLong;
+import javax.management.j2ee.statistics.CountStatistic;
+
/**
* A count statistic implementation
*
@@ -107,10 +107,11 @@
*/
public double getPeriod() {
double count = counter.get();
- if( count == 0 )
+ if (count == 0) {
return 0;
- double time = (System.currentTimeMillis() - getStartTime());
- return (time/(count*1000.0));
+ }
+ double time = System.currentTimeMillis() - getStartTime();
+ return time / (count * 1000.0);
}
/**
@@ -118,8 +119,8 @@
*/
public double getFrequency() {
double count = counter.get();
- double time = (System.currentTimeMillis() - getStartTime());
- return (count*1000.0/time);
+ double time = System.currentTimeMillis() - getStartTime();
+ return count * 1000.0 / time;
}
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/Resettable.java Fri Aug 10 07:37:46 2007
@@ -26,5 +26,5 @@
/**
* Reset the statistic
*/
- public void reset();
+ void reset();
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/monitoring/stats/StatsImpl.java Fri Aug 10 07:37:46 2007
@@ -43,7 +43,7 @@
public void reset() {
Statistic[] stats = getStatistics();
- for (int i = 0, size = stats.length; i < size; i++) {
+ for (int i = 0; i < stats.length; i++) {
Statistic stat = stats[i];
if (stat instanceof Resettable) {
Resettable r = (Resettable) stat;
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java Fri Aug 10 07:37:46 2007
@@ -22,13 +22,13 @@
import org.apache.servicemix.jbi.container.JBIContainer;
/**
- * The Broker handles Nomalised Message Routing within ServiceMix
+ * The Broker handles Normalized Message Routing within ServiceMix
*
* @version $Revision$
*/
public interface Broker extends BrokerMBean {
- public JBIContainer getContainer();
+ JBIContainer getContainer();
/**
* initialize the broker
@@ -36,17 +36,17 @@
* @param container
* @throws JBIException
*/
- public void init(JBIContainer container) throws JBIException;
+ void init(JBIContainer container) throws JBIException;
/**
* suspend the flow to prevent any message exchanges
*/
- public void suspend();
+ void suspend();
/**
* resume message exchange processing
*/
- public void resume();
+ void resume();
/**
* Route an ExchangePacket to a destination
@@ -54,6 +54,6 @@
* @param exchange
* @throws JBIException
*/
- public void sendExchangePacket(MessageExchange exchange) throws JBIException;
+ void sendExchangePacket(MessageExchange exchange) throws JBIException;
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java Fri Aug 10 07:37:46 2007
@@ -22,8 +22,8 @@
import javax.jbi.JBIException;
import javax.jbi.component.Component;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
@@ -63,11 +63,12 @@
*/
public class DefaultBroker extends BaseSystemService implements Broker {
+ private static final Log LOG = LogFactory.getLog(DefaultBroker.class);
+
private Registry registry;
private String flowNames = "seda";
- private String subscriptionFlowName = null;
+ private String subscriptionFlowName;
private Flow[] flows;
- private final static Log log = LogFactory.getLog(DefaultBroker.class);
private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
private SubscriptionManager subscriptionManager = new SubscriptionManager();
@@ -121,18 +122,18 @@
flows[i].init(this);
}
}
- subscriptionManager.init(this, registry);
+ subscriptionManager.init(this, registry);
}
-
+
protected Class<BrokerMBean> getServiceMBean() {
return BrokerMBean.class;
}
/**
- * Get the name of the Container
- *
- * @return containerName
- */
+ * Get the name of the Container
+ *
+ * @return containerName
+ */
public String getContainerName() {
return container.getName();
}
@@ -213,18 +214,19 @@
* @return the subscriptionFlowName
*/
public String getSubscriptionFlowName() {
- return subscriptionFlowName;
- }
+ return subscriptionFlowName;
+ }
/**
* Set the subscription flow name
+ *
* @param subscriptionFlowName
*/
- public void setSubscriptionFlowName(String subscriptionFlowName) {
- this.subscriptionFlowName = subscriptionFlowName;
- }
+ public void setSubscriptionFlowName(String subscriptionFlowName) {
+ this.subscriptionFlowName = subscriptionFlowName;
+ }
- /**
+ /**
* Set the flow
*
* @param flow
@@ -282,9 +284,9 @@
}
if (exchange.getRole() == Role.PROVIDER) {
- getSubscriptionManager().dispatchToSubscribers(exchange);
+ getSubscriptionManager().dispatchToSubscribers(exchange);
}
-
+
if (!foundRoute) {
boolean throwException = true;
ActivationSpec activationSpec = exchange.getActivationSpec();
@@ -292,8 +294,8 @@
throwException = activationSpec.isFailIfNoDestinationEndpoint();
}
if (throwException) {
- throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService() + " and interface: "
- + exchange.getInterfaceName());
+ throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService()
+ + " and interface: " + exchange.getInterfaceName());
} else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
exchange.handleAccept();
ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
@@ -303,15 +305,16 @@
}
}
}
-
+
protected void resolveAddress(MessageExchangeImpl exchange) throws JBIException {
ServiceEndpoint theEndpoint = exchange.getEndpoint();
if (theEndpoint != null) {
if (theEndpoint instanceof ExternalEndpoint) {
throw new JBIException("External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
}
- if (theEndpoint instanceof AbstractServiceEndpoint == false) {
- throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
+ if (!(theEndpoint instanceof AbstractServiceEndpoint)) {
+ throw new JBIException(
+ "Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
}
}
// Resolve linked endpoints
@@ -330,7 +333,7 @@
if (theEndpoint == null) {
QName serviceName = exchange.getService();
QName interfaceName = exchange.getInterfaceName();
-
+
// check in order, ServiceName then InterfaceName
// check to see if there is a match on the serviceName
if (serviceName != null) {
@@ -338,7 +341,7 @@
endpoints = getMatchingEndpoints(endpoints, exchange);
theEndpoint = getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
if (theEndpoint == null) {
- log.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
+ LOG.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
}
}
if (theEndpoint == null && interfaceName != null) {
@@ -346,7 +349,7 @@
endpoints = getMatchingEndpoints(endpoints, exchange);
theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
if (theEndpoint == null) {
- log.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
+ LOG.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
}
}
if (theEndpoint == null) {
@@ -359,8 +362,7 @@
try {
EndpointFilter filter = createEndpointFilter(context, exchange);
theEndpoint = (InternalEndpoint) destinationResolver.resolveEndpoint(context, exchange, filter);
- }
- catch (JBIException e) {
+ } catch (JBIException e) {
throw new MessagingException("Failed to resolve endpoint: " + e, e);
}
}
@@ -370,40 +372,42 @@
if (theEndpoint != null) {
exchange.setEndpoint(theEndpoint);
}
- if (log.isTraceEnabled()) {
- log.trace("Routing exchange " + exchange + " to: " + theEndpoint);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Routing exchange " + exchange + " to: " + theEndpoint);
}
}
/**
- * Filter the given endpoints by asking to the provider and consumer
- * if they are both ok to process the exchange.
+ * Filter the given endpoints by asking to the provider and consumer if they
+ * are both ok to process the exchange.
*
- * @param endpoints an array of internal endpoints to check
- * @param exchange the exchange that will be serviced
+ * @param endpoints
+ * an array of internal endpoints to check
+ * @param exchange
+ * the exchange that will be serviced
* @return an array of endpoints on which both consumer and provider agrees
*/
protected ServiceEndpoint[] getMatchingEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
- List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
+ List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
ComponentMBeanImpl consumer = getRegistry().getComponent(exchange.getSourceId());
-
- for (int i = 0; i < endpoints.length; i++) {
- ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
+
+ for (int i = 0; i < endpoints.length; i++) {
+ ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
if (id != null) {
ComponentMBeanImpl provider = getRegistry().getComponent(id);
if (provider != null) {
- if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) ||
- !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
- continue;
- }
+ if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange)
+ || !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
+ continue;
+ }
}
}
filtered.add(endpoints[i]);
- }
- return filtered.toArray(new ServiceEndpoint[filtered.size()]);
- }
+ }
+ return filtered.toArray(new ServiceEndpoint[filtered.size()]);
+ }
- /**
+ /**
* @return the default EndpointChooser
*/
public EndpointChooser getDefaultInterfaceChooser() {
@@ -443,7 +447,8 @@
}
/**
- * @param defaultFlowChooser the defaultFlowChooser to set
+ * @param defaultFlowChooser
+ * the defaultFlowChooser to set
*/
public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
this.defaultFlowChooser = defaultFlowChooser;
@@ -501,8 +506,7 @@
Component component = context.getComponent();
if (exchange.getRole() == Role.PROVIDER) {
return new ConsumerComponentEndpointFilter(component);
- }
- else {
+ } else {
return new ProducerComponentEndpointFilter(component);
}
}
@@ -521,8 +525,8 @@
return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
}
- public JBIContainer getContainer() {
- return container;
- }
+ public JBIContainer getContainer() {
+ return container;
+ }
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/SubscriptionManager.java Fri Aug 10 07:37:46 2007
@@ -16,14 +16,9 @@
*/
package org.apache.servicemix.jbi.nmr;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.framework.Registry;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import javax.jbi.JBIException;
import javax.jbi.messaging.DeliveryChannel;
@@ -32,9 +27,14 @@
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.jbi.framework.Registry;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
/**
* Handles publish/subscribe style messaging in the NMR.
@@ -43,94 +43,99 @@
* @version $Revision$
*/
public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener {
-
+
public static final String COMPONENT_NAME = "#SubscriptionManager#";
-
+
+ private static final Log LOG = LogFactory.getLog(SubscriptionManager.class);
+
+ // SM-229: Avoid StackOverflowException
+ private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
+
private Registry registry;
+
private String flowName;
- private static Log log = LogFactory.getLog(SubscriptionManager.class);
-
- // SM-229: Avoid StackOverflowException
- private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
-
+
/**
* Initialize the SubscriptionManager
+ *
* @param broker
* @throws JBIException
*/
- public void init(Broker broker, Registry registry) throws JBIException {
- this.registry = registry;
+ public void init(Broker broker, Registry reg) throws JBIException {
+ this.registry = reg;
broker.getContainer().activateComponent(this, COMPONENT_NAME);
}
/**
* Dispatches the given exchange to all matching subscribers
- * @param exchange
+ *
+ * @param exchange
* @return true if dispatched to a matching subscriber(s)
*
- * @throws JBIException
+ * @throws JBIException
*/
protected boolean dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException {
- Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
- if (source == null || !source.booleanValue()) {
- List<InternalEndpoint> list = registry.getMatchingSubscriptionEndpoints(exchange);
- if (list != null) {
- for (int i = 0; i < list.size(); i++) {
- InternalEndpoint endpoint = list.get(i);
- dispatchToSubscriber(exchange, endpoint);
- }
- }
- return list != null && !list.isEmpty();
- } else {
- return false;
- }
+ Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
+ if (source == null || !source.booleanValue()) {
+ List<InternalEndpoint> list = registry.getMatchingSubscriptionEndpoints(exchange);
+ if (list != null) {
+ for (int i = 0; i < list.size(); i++) {
+ InternalEndpoint endpoint = list.get(i);
+ dispatchToSubscriber(exchange, endpoint);
+ }
+ }
+ return list != null && !list.isEmpty();
+ } else {
+ return false;
+ }
}
/**
* Dispatches the given message exchange to the given endpoint
- * @param exchange
- * @param endpoint
- * @throws JBIException
+ *
+ * @param exchange
+ * @param endpoint
+ * @throws JBIException
*/
protected void dispatchToSubscriber(MessageExchangeImpl exchange, InternalEndpoint endpoint) throws JBIException {
- if (log.isDebugEnabled() && endpoint != null) {
- log.debug("Subscription Endpoint: "+endpoint.getEndpointName());
+ if (LOG.isDebugEnabled() && endpoint != null) {
+ LOG.debug("Subscription Endpoint: " + endpoint.getEndpointName());
}
- // SM-229: Avoid StackOverflowException
- Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
- if (source == null || !source.booleanValue()) {
- DeliveryChannel channel = getDeliveryChannel();
- InOnly me = channel.createExchangeFactory().createInOnlyExchange();
- // SM-229: Avoid StackOverflowException
- me.setProperty(FROM_SUBSCRIPTION_MANAGER,Boolean.TRUE);
- NormalizedMessage in = me.createMessage();
- getMessageTransformer().transform(me, exchange.getInMessage(), in);
- me.setInMessage(in);
- me.setEndpoint(endpoint);
- Set names = exchange.getPropertyNames();
- for (Iterator iter = names.iterator(); iter.hasNext();) {
- String name = (String) iter.next();
- me.setProperty(name, exchange.getProperty(name));
- }
- if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
- channel.sendSync(me);
- } else {
- channel.send(me);
- }
- }
- }
-
- public String getFlowName() {
- return flowName;
- }
-
- public void setFlowName(String flowName) {
- this.flowName = flowName;
- }
+ // SM-229: Avoid StackOverflowException
+ Boolean source = (Boolean) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
+ if (source == null || !source.booleanValue()) {
+ DeliveryChannel channel = getDeliveryChannel();
+ InOnly me = channel.createExchangeFactory().createInOnlyExchange();
+ // SM-229: Avoid StackOverflowException
+ me.setProperty(FROM_SUBSCRIPTION_MANAGER, Boolean.TRUE);
+ NormalizedMessage in = me.createMessage();
+ getMessageTransformer().transform(me, exchange.getInMessage(), in);
+ me.setInMessage(in);
+ me.setEndpoint(endpoint);
+ Set names = exchange.getPropertyNames();
+ for (Iterator iter = names.iterator(); iter.hasNext();) {
+ String name = (String) iter.next();
+ me.setProperty(name, exchange.getProperty(name));
+ }
+ if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
+ channel.sendSync(me);
+ } else {
+ channel.send(me);
+ }
+ }
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
// We should only receive done exchanges from subscribers
// but we need that so that they can be dequeued
}
-
+
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Fri Aug 10 07:37:46 2007
@@ -22,8 +22,8 @@
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
@@ -48,110 +48,106 @@
* @version $Revision$
*/
public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
-
+
protected final Log log = LogFactory.getLog(getClass());
-
protected Broker broker;
protected ExecutorFactory executorFactory;
private ReadWriteLock lock = new ReentrantReadWriteLock();
- private Thread suspendThread = null;
+ private Thread suspendThread;
private String name;
/**
* Initialize the Region
*
- * @param broker
+ * @param br
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
- this.broker = broker;
- this.executorFactory = broker.getContainer().getExecutorFactory();
+ public void init(Broker br) throws JBIException {
+ this.broker = br;
+ this.executorFactory = br.getContainer().getExecutorFactory();
// register self with the management context
- ObjectName objectName = broker.getContainer().getManagementContext().createObjectName(this);
+ ObjectName objectName = br.getContainer().getManagementContext().createObjectName(this);
try {
- broker.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
- }
- catch (JMException e) {
+ br.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
+ } catch (JMException e) {
throw new JBIException("Failed to register MBean with the ManagementContext", e);
}
}
-
+
/**
* start the flow
* @throws JBIException
*/
- public void start() throws JBIException{
+ public void start() throws JBIException {
super.start();
}
-
-
+
/**
* stop the flow
* @throws JBIException
*/
- public void stop() throws JBIException{
+ public void stop() throws JBIException {
if (log.isDebugEnabled()) {
log.debug("Called Flow stop");
}
- if (suspendThread != null){
+ if (suspendThread != null) {
suspendThread.interrupt();
}
super.stop();
}
-
+
/**
* shutDown the flow
* @throws JBIException
*/
- public void shutDown() throws JBIException{
+ public void shutDown() throws JBIException {
if (log.isDebugEnabled()) {
log.debug("Called Flow shutdown");
}
broker.getContainer().getManagementContext().unregisterMBean(this);
super.shutDown();
}
-
+
/**
* Distribute an ExchangePacket
* @param packet
* @throws JBIException
*/
- public void send(MessageExchange me) throws JBIException{
+ public void send(MessageExchange me) throws JBIException {
if (log.isDebugEnabled()) {
log.debug("Called Flow send");
}
- // do send
+ // do send
try {
lock.readLock().lock();
doSend((MessageExchangeImpl) me);
- } finally{
+ } finally {
lock.readLock().unlock();
}
}
-
+
/**
* suspend the flow to prevent any message exchanges
*/
- public synchronized void suspend(){
+ public synchronized void suspend() {
if (log.isDebugEnabled()) {
log.debug("Called Flow suspend");
}
lock.writeLock().lock();
suspendThread = Thread.currentThread();
}
-
-
+
/**
* resume message exchange processing
*/
- public synchronized void resume(){
+ public synchronized void resume() {
if (log.isDebugEnabled()) {
log.debug("Called Flow resume");
}
lock.writeLock().unlock();
suspendThread = null;
}
-
+
/**
* Do the Flow specific routing
* @param packet
@@ -175,12 +171,11 @@
} else {
throw new MessagingException("Component " + id.getName() + " is shut down");
}
- }
- else {
+ } else {
throw new MessagingException("No component named " + id.getName() + " - Couldn't route MessageExchange " + me);
}
}
-
+
/**
* Get an array of MBeanAttributeInfo
*
@@ -200,17 +195,17 @@
*/
protected boolean isPersistent(MessageExchange me) {
ExchangePacket packet = ((MessageExchangeImpl) me).getPacket();
- if (packet.getPersistent() != null) {
- return packet.getPersistent().booleanValue();
- } else {
- return broker.getContainer().isPersistent();
- }
+ if (packet.getPersistent() != null) {
+ return packet.getPersistent().booleanValue();
+ } else {
+ return broker.getContainer().isPersistent();
+ }
}
protected boolean isTransacted(MessageExchange me) {
return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
}
-
+
protected boolean isSynchronous(MessageExchange me) {
Boolean sync = (Boolean) me.getProperty(JbiConstants.SEND_SYNC);
return sync != null && sync.booleanValue();
@@ -222,7 +217,7 @@
ServiceEndpoint se = me.getEndpoint();
if (se instanceof InternalEndpoint) {
return ((InternalEndpoint) se).isClustered();
- // Unknown: assume this is not clustered
+ // Unknown: assume this is not clustered
} else {
return false;
}
@@ -232,7 +227,7 @@
return !source.equals(destination);
}
}
-
+
public Broker getBroker() {
return broker;
}
@@ -244,23 +239,23 @@
public String getType() {
return "Flow";
}
-
+
/**
* Get the name of the item
* @return the name
*/
public String getName() {
if (this.name == null) {
- String name = super.getName();
- if (name.endsWith("Flow")) {
- name = name.substring(0, name.length() - 4);
+ String n = super.getName();
+ if (n.endsWith("Flow")) {
+ n = n.substring(0, n.length() - 4);
}
- return name;
+ return n;
} else {
return this.name;
}
}
-
+
public void setName(String name) {
this.name = name;
}
@@ -271,5 +266,5 @@
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
-
+
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java Fri Aug 10 07:37:46 2007
@@ -40,7 +40,8 @@
}
if (foundFlow == null) {
throw new MessagingException("Flow '" + flow + "' was specified but not found");
- } if (foundFlow.canHandle(exchange)) {
+ }
+ if (foundFlow.canHandle(exchange)) {
return foundFlow;
} else {
throw new MessagingException("Flow '" + flow + "' was specified but not able to handle exchange");
@@ -55,5 +56,5 @@
}
return null;
}
-
+
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java Fri Aug 10 07:37:46 2007
@@ -16,12 +16,12 @@
*/
package org.apache.servicemix.jbi.nmr.flow;
-import org.apache.servicemix.jbi.nmr.Broker;
-
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
import javax.jbi.messaging.MessageExchange;
+import org.apache.servicemix.jbi.nmr.Broker;
+
/**
* A Flow provides different dispatch policies within the NMR
*
@@ -34,49 +34,49 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException;
+ void init(Broker broker) throws JBIException;
/**
* The description of Flow
* @return the description
*/
- public String getDescription();
+ String getDescription();
/**
* The unique name of Flow
* @return the name
*/
- public String getName();
+ String getName();
/**
* Distribute an ExchangePacket
* @param packet
* @throws JBIException
*/
- public void send(MessageExchange me) throws JBIException;
+ void send(MessageExchange me) throws JBIException;
/**
* suspend the flow to prevent any message exchanges
*/
- public void suspend();
+ void suspend();
/**
* resume message exchange processing
*/
- public void resume();
+ void resume();
/**
* Get the broker associated with this flow
*
*/
- public Broker getBroker();
+ Broker getBroker();
/**
* Check if the flow can support the requested QoS for this exchange
* @param me the exchange to check
* @return true if this flow can handle the given exchange
*/
- public boolean canHandle(MessageExchange me);
+ boolean canHandle(MessageExchange me);
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java Fri Aug 10 07:37:46 2007
@@ -19,22 +19,28 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
+
import javax.jbi.JBIException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.finder.FactoryFinder;
import org.apache.servicemix.jbi.util.IntrospectionSupport;
import org.apache.servicemix.jbi.util.URISupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* Find a Flow by Name
*
* @version $Revision$
*/
-public class FlowProvider{
- private static final Log log=LogFactory.getLog(FlowProvider.class);
- private static FactoryFinder finder=new FactoryFinder("META-INF/services/org/apache/servicemix/jbi/nmr/flow/");
+public final class FlowProvider {
+
+ private static final Log LOG = LogFactory.getLog(FlowProvider.class);
+
+ private static final FactoryFinder FINDER = new FactoryFinder("META-INF/services/org/apache/servicemix/jbi/nmr/flow/");
+
+ private FlowProvider() {
+ }
/**
* Locate a Flow
@@ -43,54 +49,54 @@
* @return the Flow
* @throws JBIException
*/
- public static Flow getFlow(String flow) throws JBIException{
+ public static Flow getFlow(String flow) throws JBIException {
Object value;
- String flowName=getFlowName(flow);
- try{
- value=finder.newInstance(flowName);
- if(value!=null&&value instanceof Flow){
- String query=getQuery(flow);
- if(query!=null){
- Map map=URISupport.parseQuery(query);
- if(map!=null&&!map.isEmpty()){
- IntrospectionSupport.setProperties(value,map);
+ String flowName = getFlowName(flow);
+ try {
+ value = FINDER.newInstance(flowName);
+ if (value != null && value instanceof Flow) {
+ String query = getQuery(flow);
+ if (query != null) {
+ Map map = URISupport.parseQuery(query);
+ if (map != null && !map.isEmpty()) {
+ IntrospectionSupport.setProperties(value, map);
}
}
return (Flow) value;
}
- throw new JBIException("No implementation found for: "+flow);
- }catch(IllegalAccessException e){
- log.error("getFlow("+flow+" failed: "+e,e);
+ throw new JBIException("No implementation found for: " + flow);
+ } catch (IllegalAccessException e) {
+ LOG.error("getFlow(" + flow + " failed: " + e, e);
throw new JBIException(e);
- }catch(InstantiationException e){
- log.error("getFlow("+flow+" failed: "+e,e);
+ } catch (InstantiationException e) {
+ LOG.error("getFlow(" + flow + " failed: " + e, e);
throw new JBIException(e);
- }catch(IOException e){
- log.error("getFlow("+flow+" failed: "+e,e);
+ } catch (IOException e) {
+ LOG.error("getFlow(" + flow + " failed: " + e, e);
throw new JBIException(e);
- }catch(ClassNotFoundException e){
- log.error("getFlow("+flow+" failed: "+e,e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("getFlow(" + flow + " failed: " + e, e);
throw new JBIException(e);
- }catch(URISyntaxException e){
- log.error("getFlow("+flow+" failed: "+e,e);
+ } catch (URISyntaxException e) {
+ LOG.error("getFlow(" + flow + " failed: " + e, e);
throw new JBIException(e);
}
}
- public static String getFlowName(String str){
- String result=str;
- int index=str.indexOf('?');
- if(index>=0){
- result=str.substring(0,index);
+ public static String getFlowName(String str) {
+ String result = str;
+ int index = str.indexOf('?');
+ if (index >= 0) {
+ result = str.substring(0, index);
}
return result;
}
- protected static String getQuery(String str){
- String result=null;
- int index=str.indexOf('?');
- if(index>=0&&(index+1)<str.length()){
- result=str.substring(index+1);
+ protected static String getQuery(String str) {
+ String result = null;
+ int index = str.indexOf('?');
+ if (index >= 0 && (index + 1) < str.length()) {
+ result = str.substring(index + 1);
}
return result;
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Fri Aug 10 07:37:46 2007
@@ -26,8 +26,8 @@
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -81,14 +81,16 @@
import org.jencks.factory.ConnectionManagerFactoryBean;
/**
- * Use for message routing among a network of containers. All routing/registration happens automatically.
+ * Use for message routing among a network of containers. All
+ * routing/registration happens automatically.
*
* @version $Revision$
* @org.apache.xbean.XBean element="jcaFlow"
*/
public class JCAFlow extends AbstractFlow implements MessageListener {
-
+
private static final String INBOUND_PREFIX = "org.apache.servicemix.jca.";
+
private String jmsURL = "tcp://localhost:61616";
private ActiveMQConnectionFactory connectionFactory;
private ConnectionFactory managedConnectionFactory;
@@ -107,11 +109,11 @@
public JCAFlow() {
}
-
+
public JCAFlow(String jmsURL) {
- this.jmsURL = jmsURL;
+ this.jmsURL = jmsURL;
}
-
+
/**
* The type of Flow
*
@@ -133,7 +135,8 @@
/**
* Sets the JMS URL for this flow
*
- * @param jmsURL The jmsURL to set.
+ * @param jmsURL
+ * The jmsURL to set.
*/
public void setJmsURL(String jmsURL) {
this.jmsURL = jmsURL;
@@ -151,7 +154,8 @@
/**
* Sets the ConnectionFactory for this flow
*
- * @param connectionFactory The connectionFactory to set.
+ * @param connectionFactory
+ * The connectionFactory to set.
*/
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
@@ -169,16 +173,17 @@
/**
* Sets the Broadcast Destination Name for this flow
*
- * @param broadcastDestinationName The broadcastDestinationName to set.
+ * @param broadcastDestinationName
+ * The broadcastDestinationName to set.
*/
public void setBroadcastDestinationName(String broadcastDestinationName) {
this.broadcastDestinationName = broadcastDestinationName;
}
public TransactionManager getTransactionManager() {
- return (TransactionManager) broker.getContainer().getTransactionManager();
+ return (TransactionManager) broker.getContainer().getTransactionManager();
}
-
+
/**
* Initialize the Region
*
@@ -204,6 +209,7 @@
public void componentStarted(ComponentEvent event) {
onComponentStarted(event);
}
+
public void componentStopped(ComponentEvent event) {
onComponentStopped(event);
}
@@ -213,13 +219,13 @@
if (connectionFactory == null) {
connectionFactory = new ActiveMQConnectionFactory(jmsURL);
}
-
- // Inbound connector
+
+ // Inbound connector
ActiveMQDestination dest = new ActiveMQQueue(INBOUND_PREFIX + broker.getContainer().getName());
containerConnector = new Connector(dest, this, true);
containerConnector.start();
-
- // Outbound connector
+
+ // Outbound connector
ActiveMQResourceAdapter outboundRa = new ActiveMQResourceAdapter();
outboundRa.setConnectionFactory(connectionFactory);
ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
@@ -229,8 +235,7 @@
// Inbound broadcast
broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("Failed to initialize JCAFlow", e);
throw new JBIException(e);
}
@@ -266,9 +271,9 @@
}
}
};
- broadcastConnector = new Connector(broadcastTopic, listener, false);
+ broadcastConnector = new Connector(broadcastTopic, listener, false);
broadcastConnector.start();
-
+
listener = new MessageListener() {
public void onMessage(Message message) {
if (started.get()) {
@@ -278,8 +283,7 @@
};
advisoryConnector = new Connector(advisoryTopic, listener, false);
advisoryConnector.start();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new JBIException("JMSException caught in start: " + e.getMessage(), e);
}
}
@@ -315,17 +319,17 @@
broker.getContainer().removeListener(componentListener);
// Destroy connectors
while (!connectorMap.isEmpty()) {
- Connector connector = connectorMap.remove(connectorMap.keySet().iterator().next());
- try {
- connector.stop();
- } catch (Exception e) {
- log.debug("Error closing jca connector", e);
- }
+ Connector connector = connectorMap.remove(connectorMap.keySet().iterator().next());
+ try {
+ connector.stop();
+ } catch (Exception e) {
+ log.debug("Error closing jca connector", e);
+ }
}
try {
- containerConnector.stop();
- } catch (Exception e) {
- log.debug("Error closing jca connector", e);
+ containerConnector.stop();
+ } catch (Exception e) {
+ log.debug("Error closing jca connector", e);
}
}
@@ -340,7 +344,9 @@
/**
* Check if the flow can support the requested QoS for this exchange
- * @param me the exchange to check
+ *
+ * @param me
+ * the exchange to check
* @return true if this flow can handle the given exchange
*/
public boolean canHandle(MessageExchange me) {
@@ -349,14 +355,14 @@
}
return true;
}
-
+
public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
if (!started.get()) {
return;
}
try {
String key = EndpointSupport.getKey(event.getEndpoint());
- if (!connectorMap.containsKey(key)){
+ if (!connectorMap.containsKey(key)) {
ActiveMQDestination dest = new ActiveMQQueue(INBOUND_PREFIX + key);
Connector connector = new Connector(dest, this, true);
connector.start();
@@ -371,9 +377,9 @@
log.error("Cannot create consumer for " + event.getEndpoint(), e);
}
}
-
+
public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
- try{
+ try {
String key = EndpointSupport.getKey(event.getEndpoint());
Connector connector = connectorMap.remove(key);
if (connector != null) {
@@ -388,7 +394,7 @@
log.error("Cannot destroy consumer for " + event, e);
}
}
-
+
public void onComponentStarted(ComponentEvent event) {
if (!started.get()) {
return;
@@ -405,12 +411,12 @@
log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
}
}
-
+
public void onComponentStopped(ComponentEvent event) {
try {
String key = event.getComponent().getName();
Connector connector = connectorMap.remove(key);
- if (connector != null){
+ if (connector != null) {
connector.stop();
}
} catch (Exception e) {
@@ -437,7 +443,7 @@
protected void doSend(MessageExchangeImpl me) throws MessagingException {
doRouting(me);
}
-
+
/**
* Distribute an ExchangePacket
*
@@ -460,9 +466,12 @@
if (me.getSourceId() == null) {
throw new IllegalStateException("No sourceId set on the exchange");
} else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
- // If the consumer is stateless and has specified a sender endpoint,
- // this exchange will be sent to the given endpoint queue, so that
- // This property must have been created using EndpointSupport.getKey
+ // If the consumer is stateless and has specified a sender
+ // endpoint,
+ // this exchange will be sent to the given endpoint queue,
+ // so that
+ // This property must have been created using
+ // EndpointSupport.getKey
// fail-over and load-balancing can be achieved
if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
@@ -496,33 +505,32 @@
if (message != null && started.get()) {
ObjectMessage objMsg = (ObjectMessage) message;
final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
- // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
- // so that there are side effect (the exchange state may have been modified)
+ // Hack for redelivery: AMQ is too optimized and the object is
+ // the same upon redelivery
+ // so that there are side effect (the exchange state may have
+ // been modified)
// See http://jira.activemq.org/jira/browse/AMQ-519
- //me = (MessageExchangeImpl) ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
+ // me = (MessageExchangeImpl) ((ActiveMQObjectMessage)
+ // ((ActiveMQObjectMessage) message).copy()).getObject();
TransactionManager tm = (TransactionManager) getTransactionManager();
if (tm != null) {
me.setTransactionContext(tm.getTransaction());
}
if (me.getDestinationId() == null) {
ServiceEndpoint se = me.getEndpoint();
- se = broker.getContainer().getRegistry()
- .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+ se = broker.getContainer().getRegistry().getInternalEndpoint(se.getServiceName(), se.getEndpointName());
me.setEndpoint(se);
me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
}
super.doRouting(me);
}
- }
- catch (JMSException jmsEx) {
+ } catch (JMSException jmsEx) {
log.error("Caught an exception unpacking JMS Message: ", jmsEx);
- }
- catch (MessagingException e) {
+ } catch (MessagingException e) {
log.error("Caught an exception routing ExchangePacket: ", e);
- }
- catch (SystemException e) {
+ } catch (SystemException e) {
log.error("Caught an exception acessing transaction context: ", e);
- }
+ }
}
protected void onAdvisoryMessage(Object obj) {
@@ -532,8 +540,7 @@
ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
for (int i = 0; i < endpoints.length; i++) {
if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
- onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
- EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+ onInternalEndpointRegistered(new EndpointEvent(endpoints[i], EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
}
}
} else if (obj instanceof RemoveInfo) {
@@ -544,52 +551,57 @@
}
private void removeAllPackets(String containerName) {
- //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
+ // TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
}
- public ConnectionManager getConnectionManager() throws Exception {
- if (connectionManager == null) {
- ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean();
- cmfb.setTransactionManager((TransactionManager) broker.getContainer().getTransactionManager());
- cmfb.setTransaction("xa");
- cmfb.afterPropertiesSet();
- connectionManager = (ConnectionManager) cmfb.getObject();
- }
- return connectionManager;
- }
-
- public void setConnectionManager(ConnectionManager connectionManager) {
- this.connectionManager = connectionManager;
- }
+ public ConnectionManager getConnectionManager() throws Exception {
+ if (connectionManager == null) {
+ ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean();
+ cmfb.setTransactionManager((TransactionManager) broker.getContainer().getTransactionManager());
+ cmfb.setTransaction("xa");
+ cmfb.afterPropertiesSet();
+ connectionManager = (ConnectionManager) cmfb.getObject();
+ }
+ return connectionManager;
+ }
- public String toString(){
+ public void setConnectionManager(ConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ }
+
+ public String toString() {
return broker.getContainer().getName() + " JCAFlow";
}
-
- private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException, SystemException {
+
+ private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException,
+ SystemException {
if (transacted) {
TransactionManager tm = (TransactionManager) getBroker().getContainer().getTransactionManager();
if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
return;
}
}
- Connection connection = managedConnectionFactory.createConnection();
- try {
- Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- ObjectMessage msg = session.createObjectMessage(object);
- MessageProducer producer = session.createProducer(dest);
- producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- producer.send(msg);
- } finally {
- connection.close();
- }
+ Connection connection = managedConnectionFactory.createConnection();
+ try {
+ Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ ObjectMessage msg = session.createObjectMessage(object);
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ producer.send(msg);
+ } finally {
+ connection.close();
+ }
}
-
+
class Connector {
private ActiveMQResourceAdapter ra;
+
private MessageEndpointFactory endpointFactory;
+
private ActiveMQActivationSpec spec;
+
private Executor executor;
+
public Connector(ActiveMQDestination destination, MessageListener listener, boolean transacted) {
ra = new ActiveMQResourceAdapter();
ra.setConnectionFactory(connectionFactory);
@@ -599,6 +611,7 @@
spec = new ActiveMQActivationSpec();
spec.setActiveMQDestination(destination);
}
+
public void start() throws ResourceException {
ExecutorFactory factory = broker.getContainer().getExecutorFactory();
executor = factory.createExecutor("flow.jca." + spec.getDestination());
@@ -607,28 +620,33 @@
spec.setResourceAdapter(ra);
ra.endpointActivation(endpointFactory, spec);
}
+
public void stop() {
ra.endpointDeactivation(endpointFactory, spec);
ra.stop();
executor.shutdown();
}
}
-
+
class SimpleBootstrapContext implements BootstrapContext {
private final WorkManager workManager;
+
public SimpleBootstrapContext(WorkManager workManager) {
this.workManager = workManager;
}
+
public Timer createTimer() throws UnavailableException {
throw new UnsupportedOperationException();
}
+
public WorkManager getWorkManager() {
return workManager;
}
+
public XATerminator getXATerminator() {
throw new UnsupportedOperationException();
}
-
+
}
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Fri Aug 10 07:37:46 2007
@@ -24,8 +24,8 @@
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -64,32 +64,20 @@
private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
- private String userName;
-
- private String password;
-
- ConnectionFactory connectionFactory;
-
+ protected ConnectionFactory connectionFactory;
protected Connection connection;
+ protected AtomicBoolean started = new AtomicBoolean(false);
+ protected MessageConsumer monitorMessageConsumer;
+ protected Set<String> subscriberSet = new CopyOnWriteArraySet<String>();
+ private String userName;
+ private String password;
private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
-
private MessageConsumer broadcastConsumer;
-
- protected Set<String> subscriberSet = new CopyOnWriteArraySet<String>();
-
private Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
-
- AtomicBoolean started = new AtomicBoolean(false);
-
private EndpointListener endpointListener;
-
private ComponentListener componentListener;
-
private Executor executor;
-
- protected MessageConsumer monitorMessageConsumer = null;
-
private String jmsURL = "peer://org.apache.servicemix?persistent=false";
/**
@@ -229,7 +217,7 @@
}
}
- abstract protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL);
+ protected abstract ConnectionFactory createConnectionFactoryFromUrl(String url);
/*
* The following abstract methods have to be implemented by specialized JMS
@@ -238,7 +226,7 @@
protected abstract void onConsumerMonitorMessage(Message message);
- abstract public void startConsumerMonitor() throws JMSException;
+ public abstract void startConsumerMonitor() throws JMSException;
public void stopConsumerMonitor() throws JMSException {
monitorMessageConsumer.close();
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Fri Aug 10 07:37:46 2007
@@ -49,8 +49,9 @@
* node is added or removed
*/
protected void onConsumerMonitorMessage(Message advisoryMessage) {
- if (!started.get())
+ if (!started.get()) {
return;
+ }
Object obj = ((ActiveMQMessage) advisoryMessage).getDataStructure();
if (obj instanceof ConsumerInfo) {
ConsumerInfo info = (ConsumerInfo) obj;
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Fri Aug 10 07:37:46 2007
@@ -48,8 +48,8 @@
try {
Class connFactoryClass = Class.forName("com.tibco.tibjms.TibjmsConnectionFactory");
if (jmsURL != null) {
- Constructor cns = connFactoryClass.getConstructor(new Class[] { String.class });
- ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] { jmsURL });
+ Constructor cns = connFactoryClass.getConstructor(new Class[] {String.class });
+ ConnectionFactory connFactory = (ConnectionFactory) cns.newInstance(new Object[] {jmsURL });
return connFactory;
} else {
ConnectionFactory connFactory = (ConnectionFactory) connFactoryClass.newInstance();
@@ -65,8 +65,9 @@
}
public void onConsumerMonitorMessage(Message message) {
- if (!started.get())
+ if (!started.get()) {
return;
+ }
try {
String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);