You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/09 18:40:08 UTC
svn commit: r564271 [10/18] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/advisory/
activemq-core/src/main/java/org/apache/activemq/blob/
activemq-core/src/main/java/org/apache/ac...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java Thu Aug 9 09:37:49 2007
@@ -30,24 +30,24 @@
* called to process a command
* @param command
*/
- public void onCommand(Object command);
+ void onCommand(Object command);
/**
* An unrecoverable exception has occured on the transport
* @param error
*/
- public void onException(IOException error);
+ void onException(IOException error);
/**
* The transport has suffered an interuption from which it hopes to recover
*
*/
- public void transportInterupted();
+ void transportInterupted();
/**
* The transport has resumed after an interuption
*
*/
- public void transportResumed();
+ void transportResumed();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java Thu Aug 9 09:37:49 2007
@@ -33,7 +33,7 @@
this(next, LogFactory.getLog(TransportLogger.class.getName() + ".Connection:" + getNextId()));
}
- synchronized private static int getNextId() {
+ private static synchronized int getNextId() {
return ++lastId;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServer.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServer.java Thu Aug 9 09:37:49 2007
@@ -36,7 +36,7 @@
*
* @param acceptListener
*/
- public void setAcceptListener(TransportAcceptListener acceptListener);
+ void setAcceptListener(TransportAcceptListener acceptListener);
/**
* Associates a broker info with the transport server so that the transport
@@ -44,15 +44,15 @@
*
* @param brokerInfo
*/
- public void setBrokerInfo(BrokerInfo brokerInfo);
+ void setBrokerInfo(BrokerInfo brokerInfo);
- public URI getConnectURI();
+ URI getConnectURI();
/**
* @return The socket address that this transport is accepting connections
* on or null if this does not or is not currently accepting
* connections on a socket.
*/
- public InetSocketAddress getSocketAddress();
+ InetSocketAddress getSocketAddress();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Thu Aug 9 09:37:49 2007
@@ -30,7 +30,7 @@
* @version $Revision: 1.1 $
*/
public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable {
- private static final Log log = LogFactory.getLog(TransportServerThreadSupport.class);
+ private static final Log LOG = LogFactory.getLog(TransportServerThreadSupport.class);
private boolean daemon = true;
private boolean joinOnStop = true;
@@ -69,7 +69,7 @@
}
protected void doStart() throws Exception {
- log.info("Listening for connections at: " + getConnectURI());
+ LOG.info("Listening for connections at: " + getConnectURI());
runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
runner.setDaemon(daemon);
runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Thu Aug 9 09:37:49 2007
@@ -28,7 +28,7 @@
* @version $Revision: 1.1 $
*/
public abstract class TransportSupport extends ServiceSupport implements Transport {
- private static final Log log = LogFactory.getLog(TransportSupport.class);
+ private static final Log LOG = LogFactory.getLog(TransportSupport.class);
TransportListener transportListener;
@@ -82,7 +82,7 @@
if (transportListener != null) {
transportListener.onCommand(command);
} else {
- log.error("No transportListener available to process inbound command: " + command);
+ LOG.error("No transportListener available to process inbound command: " + command);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Thu Aug 9 09:37:49 2007
@@ -80,8 +80,9 @@
public void oneway(Object command) throws IOException {
try {
- if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS))
+ if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgent.java Thu Aug 9 09:37:49 2007
@@ -36,7 +36,7 @@
* Sets the discovery listener
* @param listener
*/
- public void setDiscoveryListener(DiscoveryListener listener);
+ void setDiscoveryListener(DiscoveryListener listener);
/**
* register a service
@@ -57,6 +57,6 @@
void setGroup(String group);
- public void setBrokerName(String brokerName);
+ void setBrokerName(String brokerName);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java Thu Aug 9 09:37:49 2007
@@ -25,8 +25,8 @@
public abstract class DiscoveryAgentFactory {
- static final private FactoryFinder discoveryAgentFinder = new FactoryFinder("META-INF/services/org/apache/activemq/transport/discoveryagent/");
- static final private ConcurrentHashMap discoveryAgentFactorys = new ConcurrentHashMap();
+ private static final FactoryFinder DISCOVERY_AGENT_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/discoveryagent/");
+ private static final ConcurrentHashMap DISCOVERY_AGENT_FACTORYS = new ConcurrentHashMap();
/**
* @param uri
@@ -35,14 +35,15 @@
*/
private static DiscoveryAgentFactory findDiscoveryAgentFactory(URI uri) throws IOException {
String scheme = uri.getScheme();
- if (scheme == null)
+ if (scheme == null) {
throw new IOException("DiscoveryAgent scheme not specified: [" + uri + "]");
- DiscoveryAgentFactory daf = (DiscoveryAgentFactory)discoveryAgentFactorys.get(scheme);
+ }
+ DiscoveryAgentFactory daf = (DiscoveryAgentFactory)DISCOVERY_AGENT_FACTORYS.get(scheme);
if (daf == null) {
// Try to load if from a META-INF property.
try {
- daf = (DiscoveryAgentFactory)discoveryAgentFinder.newInstance(scheme);
- discoveryAgentFactorys.put(scheme, daf);
+ daf = (DiscoveryAgentFactory)DISCOVERY_AGENT_FINDER.newInstance(scheme);
+ DISCOVERY_AGENT_FACTORYS.put(scheme, daf);
} catch (Throwable e) {
throw IOExceptionSupport.create("DiscoveryAgent scheme NOT recognized: [" + scheme + "]", e);
}
@@ -56,7 +57,7 @@
}
- abstract protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException;
+ protected abstract DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException;
// {
// try {
// String type = ( uri.getScheme() == null ) ? uri.getPath() :
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java Thu Aug 9 09:37:49 2007
@@ -20,12 +20,13 @@
/**
* A listener of services being added or removed from a network
- *
+ *
* @version $Revision$
*/
public interface DiscoveryListener {
-
- public void onServiceAdd(DiscoveryEvent event);
- public void onServiceRemove(DiscoveryEvent event);
-
+
+ void onServiceAdd(DiscoveryEvent event);
+
+ void onServiceRemove(DiscoveryEvent event);
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Thu Aug 9 09:37:49 2007
@@ -35,7 +35,7 @@
*/
public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
- private static final Log log = LogFactory.getLog(DiscoveryTransport.class);
+ private static final Log LOG = LogFactory.getLog(DiscoveryTransport.class);
private final CompositeTransport next;
private DiscoveryAgent discoveryAgent;
@@ -58,10 +58,10 @@
}
public void stop() throws Exception {
- ServiceStopper ss = new ServiceStopper();
- ss.stop(discoveryAgent);
- ss.stop(next);
- ss.throwFirstException();
+ ServiceStopper ss = new ServiceStopper();
+ ss.stop(discoveryAgent);
+ ss.stop(next);
+ ss.throwFirstException();
}
public void onServiceAdd(DiscoveryEvent event) {
@@ -70,18 +70,18 @@
try {
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
- log.info("Adding new broker connection URL: " + uri );
- next.add(new URI[]{uri});
+ LOG.info("Adding new broker connection URL: " + uri);
+ next.add(new URI[] {uri});
} catch (URISyntaxException e) {
- log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
+ LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
}
}
public void onServiceRemove(DiscoveryEvent event) {
- URI uri = (URI) serviceURIs.get(event.getServiceName());
+ URI uri = (URI)serviceURIs.get(event.getServiceName());
if (uri != null) {
- next.remove(new URI[]{uri});
+ next.remove(new URI[] {uri});
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Thu Aug 9 09:37:49 2007
@@ -47,7 +47,7 @@
* @version $Revision$
*/
public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
- private static final Log log = LogFactory.getLog(MulticastDiscoveryAgent.class);
+ private static final Log LOG = LogFactory.getLog(MulticastDiscoveryAgent.class);
public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
private static final String TYPE_SUFFIX = "ActiveMQ-4.";
private static final String ALIVE = "alive.";
@@ -77,24 +77,25 @@
this.lastHeartBeat = System.currentTimeMillis();
}
- synchronized public void updateHeartBeat() {
+ public synchronized void updateHeartBeat() {
lastHeartBeat = System.currentTimeMillis();
// Consider that the broker recovery has succeeded if it has not
// failed in 60 seconds.
if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
- if (log.isDebugEnabled())
- log.debug("I now think that the " + service + " service has recovered.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("I now think that the " + service + " service has recovered.");
+ }
failureCount = 0;
recoveryTime = 0;
}
}
- synchronized public long getLastHeartBeat() {
+ public synchronized long getLastHeartBeat() {
return lastHeartBeat;
}
- synchronized public boolean markFailed() {
+ public synchronized boolean markFailed() {
if (!failed) {
failed = true;
failureCount++;
@@ -104,16 +105,15 @@
reconnectDelay = initialReconnectDelay;
} else {
reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
- if (reconnectDelay > maxReconnectDelay)
+ if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
+ }
}
- if (log.isDebugEnabled())
- log
- .debug("Remote failure of "
- + service
- + " while still receiving multicast advertisements. Advertising events will be suppressed for "
- + reconnectDelay + " ms, the current failure count is: " + failureCount);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay
+ + " ms, the current failure count is: " + failureCount);
+ }
recoveryTime = System.currentTimeMillis() + reconnectDelay;
return true;
@@ -125,24 +125,27 @@
* @return true if this broker is marked failed and it is now the right
* time to start recovery.
*/
- synchronized public boolean doRecovery() {
- if (!failed)
+ public synchronized boolean doRecovery() {
+ if (!failed) {
return false;
+ }
// Are we done trying to recover this guy?
if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
- if (log.isDebugEnabled())
- log.debug("Max reconnect attempts of the " + service + " service has been reached.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
+ }
return false;
}
// Is it not yet time?
- if (System.currentTimeMillis() < recoveryTime)
+ if (System.currentTimeMillis() < recoveryTime) {
return false;
+ }
- if (log.isDebugEnabled())
- log.debug("Resuming event advertisement of the " + service + " service.");
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Resuming event advertisement of the " + service + " service.");
+ }
failed = false;
return true;
}
@@ -169,15 +172,13 @@
private AtomicBoolean started = new AtomicBoolean(false);
private boolean reportAdvertizeFailed = true;
- private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
- new LinkedBlockingQueue(), new ThreadFactory() {
- public Thread newThread(Runnable runable) {
- Thread t = new Thread(runable,
- "Multicast Discovery Agent Notifier");
- t.setDaemon(true);
- return t;
- }
- });
+ private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
+ t.setDaemon(true);
+ return t;
+ }
+ });
/**
* Set the discovery listener
@@ -298,11 +299,11 @@
throw new IOException("You must specify a group to discover");
}
if (brokerName == null || brokerName.length() == 0) {
- log.warn("brokerName not set");
+ LOG.warn("brokerName not set");
}
String type = getType();
if (!type.endsWith(".")) {
- log.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
+ LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
type += ".";
}
if (discoveryURI == null) {
@@ -354,7 +355,7 @@
// ignore
} catch (IOException e) {
if (started.get()) {
- log.error("failed to process packet: " + e);
+ LOG.error("failed to process packet: " + e);
}
}
}
@@ -408,9 +409,9 @@
// same error over and over.
if (reportAdvertizeFailed) {
reportAdvertizeFailed = false;
- log.error("Failed to advertise our service: " + payload, e);
+ LOG.error("Failed to advertise our service: " + payload, e);
if ("Operation not permitted".equals(e.getMessage())) {
- log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. "
+ LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. "
+ "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java Thu Aug 9 09:37:49 2007
@@ -44,7 +44,7 @@
* @version $Revision$
*/
public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener {
- private static final Log log = LogFactory.getLog(RendezvousDiscoveryAgent.class);
+ private static final Log LOG = LogFactory.getLog(RendezvousDiscoveryAgent.class);
private static final String TYPE_SUFFIX = "ActiveMQ-4.";
@@ -66,14 +66,14 @@
}
String type = getType();
if (!type.endsWith(".")) {
- log.warn("The type '" + type + "' should end with '.' to be a valid Rendezvous type");
+ LOG.warn("The type '" + type + "' should end with '.' to be a valid Rendezvous type");
type += ".";
}
try {
// force lazy construction
getJmdns();
if (listener != null) {
- log.info("Discovering service of type: " + type);
+ LOG.info("Discovering service of type: " + type);
jmdns.addServiceListener(type, this);
}
} catch (IOException e) {
@@ -112,20 +112,22 @@
// ServiceListener interface
// -------------------------------------------------------------------------
public void addService(JmDNS jmDNS, String type, String name) {
- if (log.isDebugEnabled()) {
- log.debug("addService with type: " + type + " name: " + name);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addService with type: " + type + " name: " + name);
}
- if (listener != null)
+ if (listener != null) {
listener.onServiceAdd(new DiscoveryEvent(name));
+ }
jmDNS.requestServiceInfo(type, name);
}
public void removeService(JmDNS jmDNS, String type, String name) {
- if (log.isDebugEnabled()) {
- log.debug("removeService with type: " + type + " name: " + name);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removeService with type: " + type + " name: " + name);
}
- if (listener != null)
+ if (listener != null) {
listener.onServiceRemove(new DiscoveryEvent(name));
+ }
}
public void serviceAdded(ServiceEvent event) {
@@ -195,8 +197,8 @@
String type = getType();
- if (log.isDebugEnabled()) {
- log.debug("Registering service type: " + type + " name: " + name + " details: " + map);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registering service type: " + type + " name: " + name + " details: " + map);
}
return new ServiceInfo(type, name + "." + type, port, weight, priority, "");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Thu Aug 9 09:37:49 2007
@@ -132,8 +132,9 @@
synchronized (sleepMutex) {
try {
- if (!running.get())
+ if (!running.get()) {
return;
+ }
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
@@ -147,8 +148,9 @@
} else {
// Exponential increment of reconnect delay.
event.reconnectDelay *= backOffMultiplier;
- if (event.reconnectDelay > maxReconnectDelay)
+ if (event.reconnectDelay > maxReconnectDelay) {
event.reconnectDelay = maxReconnectDelay;
+ }
}
} else {
@@ -156,8 +158,9 @@
event.reconnectDelay = initialReconnectDelay;
}
- if (!running.get())
+ if (!running.get()) {
return;
+ }
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Aug 9 09:37:49 2007
@@ -52,7 +52,7 @@
*/
public class FailoverTransport implements CompositeTransport {
- private static final Log log = LogFactory.getLog(FailoverTransport.class);
+ private static final Log LOG = LogFactory.getLog(FailoverTransport.class);
private TransportListener transportListener;
private boolean disposed;
@@ -166,7 +166,7 @@
for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
URI uri = (URI)iter.next();
try {
- log.debug("Attempting connect to: " + uri);
+ LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
@@ -175,7 +175,7 @@
restoreTransport(t);
}
- log.debug("Connection established");
+ LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
@@ -184,18 +184,18 @@
if (transportListener != null) {
transportListener.transportResumed();
}
- log.info("Successfully reconnected to " + uri);
+ LOG.info("Successfully reconnected to " + uri);
return false;
} catch (Exception e) {
failure = e;
- log.debug("Connect fail to: " + uri + ", reason: " + e);
+ LOG.debug("Connect fail to: " + uri + ", reason: " + e);
}
}
}
}
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
- log.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
+ LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
reconnectMutex.notifyAll();
return false;
@@ -204,7 +204,7 @@
if (!disposed) {
- log.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
synchronized (sleepMutex) {
try {
sleepMutex.wait(reconnectDelay);
@@ -216,8 +216,9 @@
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
- if (reconnectDelay > maxReconnectDelay)
+ if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
+ }
}
}
return !disposed;
@@ -231,7 +232,7 @@
transportListener.transportInterupted();
}
synchronized (reconnectMutex) {
- log.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
+ LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
if (connectedTransport != null) {
initialized = false;
ServiceSupport.dispose(connectedTransport);
@@ -244,9 +245,10 @@
public void start() throws Exception {
synchronized (reconnectMutex) {
- log.debug("Started.");
- if (started)
+ LOG.debug("Started.");
+ if (started) {
return;
+ }
started = true;
if (connectedTransport != null) {
stateTracker.restore(connectedTransport);
@@ -256,9 +258,10 @@
public void stop() throws Exception {
synchronized (reconnectMutex) {
- log.debug("Stopped.");
- if (!started)
+ LOG.debug("Stopped.");
+ if (!started) {
return;
+ }
started = false;
disposed = true;
@@ -348,12 +351,12 @@
// Wait for transport to be connected.
while (connectedTransport == null && !disposed && connectionFailure == null) {
- log.trace("Waiting for transport to reconnect.");
+ LOG.trace("Waiting for transport to reconnect.");
try {
reconnectMutex.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- log.debug("Interupted: " + e, e);
+ LOG.debug("Interupted: " + e, e);
}
}
@@ -408,7 +411,7 @@
return;
} catch (IOException e) {
- log.debug("Send oneway attempt: " + i + " failed.");
+ LOG.debug("Send oneway attempt: " + i + " failed.");
handleTransportFailure(e);
}
}
@@ -420,8 +423,9 @@
}
if (!disposed) {
if (error != null) {
- if (error instanceof IOException)
+ if (error instanceof IOException) {
throw (IOException)error;
+ }
throw IOExceptionSupport.create(error);
}
}
@@ -441,8 +445,9 @@
public void add(URI u[]) {
for (int i = 0; i < u.length; i++) {
- if (!uris.contains(u[i]))
+ if (!uris.contains(u[i])) {
uris.add(u[i]);
+ }
}
reconnect();
}
@@ -457,17 +462,18 @@
public void add(String u) {
try {
URI uri = new URI(u);
- if (!uris.contains(uri))
+ if (!uris.contains(uri)) {
uris.add(uri);
+ }
reconnect();
} catch (Exception e) {
- log.error("Failed to parse URI: " + u);
+ LOG.error("Failed to parse URI: " + u);
}
}
public void reconnect() {
- log.debug("Waking up reconnect task");
+ LOG.debug("Waking up reconnect task");
try {
reconnectTask.wakeup();
} catch (InterruptedException e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Aug 9 09:37:49 2007
@@ -52,7 +52,7 @@
*/
public class FanoutTransport implements CompositeTransport {
- private static final Log log = LogFactory.getLog(FanoutTransport.class);
+ private static final Log LOG = LogFactory.getLog(FanoutTransport.class);
private TransportListener transportListener;
private boolean disposed;
@@ -126,10 +126,11 @@
public void onException(IOException error) {
try {
synchronized (reconnectMutex) {
- if (transport == null)
+ if (transport == null) {
return;
+ }
- log.debug("Transport failed, starting up reconnect task", error);
+ LOG.debug("Transport failed, starting up reconnect task", error);
ServiceSupport.dispose(transport);
transport = null;
@@ -198,10 +199,10 @@
URI uri = fanoutHandler.uri;
try {
- log.debug("Stopped: " + this);
- log.debug("Attempting connect to: " + uri);
+ LOG.debug("Stopped: " + this);
+ LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
- log.debug("Connection established");
+ LOG.debug("Connection established");
fanoutHandler.transport = t;
fanoutHandler.reconnectDelay = 10;
fanoutHandler.connectFailures = 0;
@@ -214,10 +215,10 @@
restoreTransport(fanoutHandler);
}
} catch (Exception e) {
- log.debug("Connect fail to: " + uri + ", reason: " + e);
+ LOG.debug("Connect fail to: " + uri + ", reason: " + e);
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
- log.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
+ LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
connectionFailure = e;
reconnectMutex.notifyAll();
return false;
@@ -226,8 +227,9 @@
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
fanoutHandler.reconnectDelay *= backOffMultiplier;
- if (fanoutHandler.reconnectDelay > maxReconnectDelay)
+ if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
fanoutHandler.reconnectDelay = maxReconnectDelay;
+ }
}
fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
@@ -251,7 +253,7 @@
try {
long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
if (reconnectDelay > 0) {
- log.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
Thread.sleep(reconnectDelay);
}
} catch (InterruptedException e1) {
@@ -262,9 +264,10 @@
public void start() throws Exception {
synchronized (reconnectMutex) {
- log.debug("Started.");
- if (started)
+ LOG.debug("Started.");
+ if (started) {
return;
+ }
started = true;
for (Iterator iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
@@ -279,8 +282,9 @@
synchronized (reconnectMutex) {
ServiceStopper ss = new ServiceStopper();
- if (!started)
+ if (!started) {
return;
+ }
started = false;
disposed = true;
@@ -291,7 +295,7 @@
}
}
- log.debug("Stopped: " + this);
+ LOG.debug("Stopped: " + this);
ss.throwFirstException();
}
reconnectTask.shutdown();
@@ -346,7 +350,7 @@
// Wait for transport to be connected.
while (connectedCount != minAckCount && !disposed && connectionFailure == null) {
- log.debug("Waiting for at least " + minAckCount + " transports to be connected.");
+ LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
reconnectMutex.wait(1000);
}
@@ -364,8 +368,9 @@
error = new IOException("Unexpected failure.");
}
- if (error instanceof IOException)
+ if (error instanceof IOException) {
throw (IOException)error;
+ }
throw IOExceptionSupport.create(error);
}
@@ -377,7 +382,7 @@
try {
th.transport.oneway(command);
} catch (IOException e) {
- log.debug("Send attempt: failed.");
+ LOG.debug("Send attempt: failed.");
th.onException(e);
}
}
@@ -386,7 +391,7 @@
try {
primary.transport.oneway(command);
} catch (IOException e) {
- log.debug("Send attempt: failed.");
+ LOG.debug("Send attempt: failed.");
primary.onException(e);
}
}
@@ -426,7 +431,7 @@
}
public void reconnect() {
- log.debug("Waking up reconnect task");
+ LOG.debug("Waking up reconnect task");
try {
reconnectTask.wakeup();
} catch (InterruptedException e) {
@@ -453,8 +458,9 @@
FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
if (th.transport != null) {
Object rc = th.transport.narrow(target);
- if (rc != null)
+ if (rc != null) {
return rc;
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Thu Aug 9 09:37:49 2007
@@ -39,12 +39,13 @@
/**
*/
- synchronized public void setTransportListener(TransportListener channelListener) {
+ public synchronized void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
- if (channelListener == null)
+ if (channelListener == null) {
getNext().setTransportListener(null);
- else
+ } else {
getNext().setTransportListener(this);
+ }
}
/**
@@ -52,10 +53,12 @@
* @throws IOException if the next channel has not been set.
*/
public void start() throws Exception {
- if (getNext() == null)
+ if (getNext() == null) {
throw new IOException("The next channel has not been set.");
- if (transportListener == null)
+ }
+ if (transportListener == null) {
throw new IOException("The command listener has not been set.");
+ }
getNext().start();
}
@@ -73,14 +76,14 @@
/**
* @return Returns the getNext().
*/
- synchronized public Transport getNext() {
+ public synchronized Transport getNext() {
return next;
}
/**
* @return Returns the packetListener.
*/
- synchronized public TransportListener getTransportListener() {
+ public synchronized TransportListener getTransportListener() {
return transportListener;
}
@@ -115,7 +118,7 @@
return getNext().narrow(target);
}
- synchronized public void setNext(Transport next) {
+ public synchronized void setNext(Transport next) {
this.next = next;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java Thu Aug 9 09:37:49 2007
@@ -42,7 +42,7 @@
*/
public class MulticastTransport extends UdpTransport {
- private static final Log log = LogFactory.getLog(MulticastTransport.class);
+ private static final Log LOG = LogFactory.getLog(MulticastTransport.class);
private static final int DEFAULT_IDLE_TIME = 5000;
@@ -109,7 +109,7 @@
socket.setLoopbackMode(loopBackMode);
socket.setTimeToLive(timeToLive);
- log.debug("Joining multicast address: " + getMulticastAddress());
+ LOG.debug("Joining multicast address: " + getMulticastAddress());
socket.joinGroup(getMulticastAddress());
socket.setSoTimeout((int)keepAliveInterval);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Thu Aug 9 09:37:49 2007
@@ -31,7 +31,7 @@
public class NIOOutputStream extends OutputStream {
- private final static int BUFFER_SIZE = 8192;
+ private static final int BUFFER_SIZE = 8192;
private final WritableByteChannel out;
private final byte[] buffer;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Thu Aug 9 09:37:49 2007
@@ -101,8 +101,9 @@
break;
}
- if (currentBuffer.hasRemaining())
+ if (currentBuffer.hasRemaining()) {
continue;
+ }
// Are we trying to figure out the size of the next frame?
if (nextFrameSize == -1) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java Thu Aug 9 09:37:49 2007
@@ -32,19 +32,9 @@
*
* @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
*/
-final public class SelectorManager {
+public final class SelectorManager {
- static final public SelectorManager singleton = new SelectorManager();
-
- static SelectorManager getInstance() {
- return singleton;
- }
-
- public interface Listener {
- public void onSelect(SelectorSelection selector);
-
- public void onError(SelectorSelection selection, Throwable error);
- }
+ public static final SelectorManager SINGLETON = new SelectorManager();
private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -56,6 +46,17 @@
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 64;
+
+ static SelectorManager getInstance() {
+ return SINGLETON;
+ }
+
+ public interface Listener {
+ void onSelect(SelectorSelection selector);
+
+ void onError(SelectorSelection selection, Throwable error);
+ }
+
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
throws IOException {
@@ -76,11 +77,11 @@
freeWorkers.remove(worker);
}
- synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
+ public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
freeWorkers.remove(worker);
}
- synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
+ public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
freeWorkers.add(worker);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Thu Aug 9 09:37:49 2007
@@ -25,7 +25,7 @@
/**
* @author chirino
*/
-final public class SelectorSelection {
+public final class SelectorSelection {
private final SelectorWorker worker;
private final SelectionKey key;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Thu Aug 9 09:37:49 2007
@@ -25,13 +25,13 @@
public class SelectorWorker implements Runnable {
- private final static AtomicInteger NEXT_ID = new AtomicInteger();
+ private static final AtomicInteger NEXT_ID = new AtomicInteger();
final SelectorManager manager;
final Selector selector;
final int id = NEXT_ID.getAndIncrement();
final AtomicInteger useCounter = new AtomicInteger();
- final private int maxChannelsPerWorker;
+ private final int maxChannelsPerWorker;
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
@@ -69,11 +69,13 @@
while (isRunning()) {
int count = selector.select(10);
- if (count == 0)
+ if (count == 0) {
continue;
+ }
- if (!isRunning())
+ if (!isRunning()) {
return;
+ }
// Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java Thu Aug 9 09:37:49 2007
@@ -37,13 +37,10 @@
public class PeerTransportFactory extends TransportFactory {
- final public static ConcurrentHashMap brokers = new ConcurrentHashMap();
-
- final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
-
- final public static ConcurrentHashMap servers = new ConcurrentHashMap();
-
- private IdGenerator idGenerator = new IdGenerator("peer-");
+ public static final ConcurrentHashMap BROKERS = new ConcurrentHashMap();
+ public static final ConcurrentHashMap CONNECTORS = new ConcurrentHashMap();
+ public static final ConcurrentHashMap SERVERS = new ConcurrentHashMap();
+ private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-");
public Transport doConnect(URI location) throws Exception {
VMTransportFactory vmTransportFactory = createTransportFactory(location);
@@ -69,7 +66,7 @@
group = "default";
}
if (broker == null || broker.length() == 0) {
- broker = idGenerator.generateSanitizedId();
+ broker = ID_GENERATOR.generateSanitizedId();
}
final Map brokerOptions = new HashMap(URISupport.parseParamters(location));
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java Thu Aug 9 09:37:49 2007
@@ -29,7 +29,7 @@
*/
public class DefaultReplayBuffer implements ReplayBuffer {
- private static final Log log = LogFactory.getLog(DefaultReplayBuffer.class);
+ private static final Log LOG = LogFactory.getLog(DefaultReplayBuffer.class);
private final int size;
private ReplayBufferListener listener;
@@ -43,8 +43,8 @@
}
public void addBuffer(int commandId, Object buffer) {
- if (log.isDebugEnabled()) {
- log.debug("Adding command ID: " + commandId + " to replay buffer: " + this + " object: " + buffer);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding command ID: " + commandId + " to replay buffer: " + this + " object: " + buffer);
}
synchronized (lock) {
int max = size - 1;
@@ -65,8 +65,8 @@
if (replayer == null) {
throw new IllegalArgumentException("No Replayer parameter specified");
}
- if (log.isDebugEnabled()) {
- log.debug("Buffer: " + this + " replaying messages from: " + fromCommandId + " to: " + toCommandId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Buffer: " + this + " replaying messages from: " + fromCommandId + " to: " + toCommandId);
}
for (int i = fromCommandId; i <= toCommandId; i++) {
Object buffer = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Thu Aug 9 09:37:49 2007
@@ -38,7 +38,7 @@
* @version $Revision$
*/
public class ReliableTransport extends ResponseCorrelator {
- private static final Log log = LogFactory.getLog(ReliableTransport.class);
+ private static final Log LOG = LogFactory.getLog(ReliableTransport.class);
private ReplayStrategy replayStrategy;
private SortedSet commands = new TreeSet(new CommandIdComparator());
@@ -132,8 +132,8 @@
if (keep) {
// lets add it to the list for later on
- if (log.isDebugEnabled()) {
- log.debug("Received out of order command which is being buffered for later: " + command);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received out of order command which is being buffered for later: " + command);
}
commands.add(command);
}
@@ -261,7 +261,7 @@
* Lets attempt to replay the request as a command may have disappeared
*/
protected void onMissingResponse(Command command, FutureResponse response) {
- log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
+ LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
int commandId = command.getCommandId();
requestReplay(commandId, commandId);
@@ -276,8 +276,8 @@
if (replayer == null) {
onException(new IOException("Cannot replay commands. No replayer property configured"));
}
- if (log.isDebugEnabled()) {
- log.debug("Processing replay command: " + command);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing replay command: " + command);
}
getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java Thu Aug 9 09:37:49 2007
@@ -31,9 +31,9 @@
* Submit a buffer for caching around for a period of time, during which time it can be replayed
* to users interested in it.
*/
- public void addBuffer(int commandId, Object buffer);
+ void addBuffer(int commandId, Object buffer);
- public void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter);
+ void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter);
- public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException;
+ void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java Thu Aug 9 09:37:49 2007
@@ -27,5 +27,5 @@
* Indications that the buffer has been discarded and so could be
* re-introduced into some pool
*/
- public void onBufferDiscarded(int commandId, Object buffer);
+ void onBufferDiscarded(int commandId, Object buffer);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Thu Aug 9 09:37:49 2007
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.transport.stomp;
import java.io.IOException;
@@ -17,19 +33,23 @@
* from one to the other
*/
public interface FrameTranslator {
- public ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
+ ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
- public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
+ StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
- public String convertDestination(Destination d);
+ String convertDestination(Destination d);
- public ActiveMQDestination convertDestination(String name) throws ProtocolException;
+ ActiveMQDestination convertDestination(String name) throws ProtocolException;
/**
* Helper class which holds commonly needed functions used when implementing
* FrameTranslators
*/
- public final static class Helper {
+ static final class Helper {
+
+ private Helper() {
+ }
+
public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
final Map headers = command.getHeaders();
headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination()));
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Thu Aug 9 09:37:49 2007
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.transport.stomp;
import java.io.IOException;
@@ -67,24 +83,24 @@
if (d == null) {
return null;
}
- ActiveMQDestination amq_d = (ActiveMQDestination)d;
- String p_name = amq_d.getPhysicalName();
+ ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
+ String physicalName = activeMQDestination.getPhysicalName();
StringBuffer buffer = new StringBuffer();
- if (amq_d.isQueue()) {
- if (amq_d.isTemporary()) {
+ if (activeMQDestination.isQueue()) {
+ if (activeMQDestination.isTemporary()) {
buffer.append("/temp-queue/");
} else {
buffer.append("/queue/");
}
} else {
- if (amq_d.isTemporary()) {
+ if (activeMQDestination.isTemporary()) {
buffer.append("/temp-topic/");
} else {
buffer.append("/topic/");
}
}
- buffer.append(p_name);
+ buffer.append(physicalName);
return buffer.toString();
}
@@ -92,17 +108,17 @@
if (name == null) {
return null;
} else if (name.startsWith("/queue/")) {
- String q_name = name.substring("/queue/".length(), name.length());
- return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
+ String qName = name.substring("/queue/".length(), name.length());
+ return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
} else if (name.startsWith("/topic/")) {
- String t_name = name.substring("/topic/".length(), name.length());
- return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
+ String tName = name.substring("/topic/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
} else if (name.startsWith("/temp-queue/")) {
- String t_name = name.substring("/temp-queue/".length(), name.length());
- return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_QUEUE_TYPE);
+ String tName = name.substring("/temp-queue/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
} else if (name.startsWith("/temp-topic/")) {
- String t_name = name.substring("/temp-topic/".length(), name.length());
- return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TEMP_TOPIC_TYPE);
+ String tName = name.substring("/temp-topic/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
} else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+ "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Aug 9 09:37:49 2007
@@ -56,8 +56,9 @@
*/
public class ProtocolConverter {
- private static final IdGenerator connectionIdGenerator = new IdGenerator();
- private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+ private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+ private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
@@ -129,26 +130,27 @@
}
String action = command.getAction();
- if (action.startsWith(Stomp.Commands.SEND))
+ if (action.startsWith(Stomp.Commands.SEND)) {
onStompSend(command);
- else if (action.startsWith(Stomp.Commands.ACK))
+ } else if (action.startsWith(Stomp.Commands.ACK)) {
onStompAck(command);
- else if (action.startsWith(Stomp.Commands.BEGIN))
+ } else if (action.startsWith(Stomp.Commands.BEGIN)) {
onStompBegin(command);
- else if (action.startsWith(Stomp.Commands.COMMIT))
+ } else if (action.startsWith(Stomp.Commands.COMMIT)) {
onStompCommit(command);
- else if (action.startsWith(Stomp.Commands.ABORT))
+ } else if (action.startsWith(Stomp.Commands.ABORT)) {
onStompAbort(command);
- else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
+ } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
onStompSubscribe(command);
- else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
+ } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
onStompUnsubscribe(command);
- else if (action.startsWith(Stomp.Commands.CONNECT))
+ } else if (action.startsWith(Stomp.Commands.CONNECT)) {
onStompConnect(command);
- else if (action.startsWith(Stomp.Commands.DISCONNECT))
+ } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
onStompDisconnect(command);
- else
+ } else {
throw new ProtocolException("Unknown STOMP action: " + action);
+ }
} catch (ProtocolException e) {
@@ -169,8 +171,9 @@
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
sendToStomp(errorMessage);
- if (e.isFatal())
+ if (e.isFatal()) {
getTransportFilter().onException(e);
+ }
}
}
@@ -189,8 +192,9 @@
if (stompTx != null) {
TransactionId activemqTx = (TransactionId)transactions.get(stompTx);
- if (activemqTx == null)
+ if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
+ }
message.setTransactionId(activemqTx);
}
@@ -210,15 +214,17 @@
Map headers = command.getHeaders();
String messageId = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
- if (messageId == null)
+ if (messageId == null) {
throw new ProtocolException("ACK received without a message-id to acknowledge!");
+ }
TransactionId activemqTx = null;
String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
activemqTx = (TransactionId)transactions.get(stompTx);
- if (activemqTx == null)
+ if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
+ }
}
boolean acked = false;
@@ -233,8 +239,9 @@
}
}
- if (!acked)
+ if (!acked) {
throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
+ }
}
@@ -318,7 +325,7 @@
String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
- ActiveMQDestination actual_dest = frameTranslator.convertDestination(destination);
+ ActiveMQDestination actualDest = frameTranslator.convertDestination(destination);
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000);
@@ -332,7 +339,7 @@
consumerInfo.setDestination(frameTranslator.convertDestination(destination));
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
- stompSubscription.setDestination(actual_dest);
+ stompSubscription.setDestination(actualDest);
String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
@@ -352,8 +359,9 @@
ActiveMQDestination destination = null;
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
- if (o != null)
+ if (o != null) {
destination = frameTranslator.convertDestination((String)o);
+ }
String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
@@ -396,10 +404,11 @@
IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
connectionInfo.setConnectionId(connectionId);
- if (clientId != null)
+ if (clientId != null) {
connectionInfo.setClientId(clientId);
- else
+ } else {
connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+ }
connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(login);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Thu Aug 9 09:37:49 2007
@@ -97,8 +97,9 @@
String id = (String)entry.getKey();
MessageId msgid = (MessageId)entry.getValue();
- if (ack.getFirstMessageId() == null)
+ if (ack.getFirstMessageId() == null) {
ack.setFirstMessageId(msgid);
+ }
iter.remove();
count++;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Thu Aug 9 09:37:49 2007
@@ -36,7 +36,7 @@
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompTransportFilter extends TransportFilter {
- static final private Log log = LogFactory.getLog(StompTransportFilter.class);
+ private static final Log LOG = LogFactory.getLog(StompTransportFilter.class);
private final ProtocolConverter protocolConverter;
private final Object sendToActiveMQMutex = new Object();
@@ -64,7 +64,7 @@
public void onCommand(Object command) {
try {
if (trace) {
- log.trace("Received: \n" + command);
+ LOG.trace("Received: \n" + command);
}
protocolConverter.onStompCommad((StompFrame)command);
} catch (IOException e) {
@@ -82,7 +82,7 @@
public void sendToStomp(StompFrame command) throws IOException {
if (trace) {
- log.trace("Sending: \n" + command);
+ LOG.trace("Sending: \n" + command);
}
synchronized (sendToStompMutex) {
next.oneway(command);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Aug 9 09:37:49 2007
@@ -108,13 +108,14 @@
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
- if (headers.size() > MAX_HEADERS)
+ if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
+ }
try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
+ int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperatorIndex).trim();
+ String value = line.substring(seperatorIndex + 1, line.length()).trim();
headers.put(name, value);
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
@@ -137,8 +138,9 @@
throw new ProtocolException("Specified content-length is not a valid integer", true);
}
- if (length > MAX_DATA_LENGTH)
+ if (length > MAX_DATA_LENGTH) {
throw new ProtocolException("The maximum data length was exceeded", true);
+ }
data = new byte[length];
in.readFully(data);
@@ -182,8 +184,9 @@
byte b;
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
- if (baos.size() > maxLength)
+ if (baos.size() > maxLength) {
throw new ProtocolException(errorMessage, true);
+ }
baos.write(b);
}
baos.close();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Thu Aug 9 09:37:49 2007
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -61,7 +61,7 @@
private SSLContext sslContext = null;
// The log this uses.,
- private static final Log log = LogFactory.getLog(SslTransportFactory.class);
+ private static final Log LOG = LogFactory.getLog(SslTransportFactory.class);
/**
* Constructor. Nothing special.
@@ -130,7 +130,7 @@
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
} catch (Exception e) {
- log.warn("path isn't a valid local location for SslTransport to use", e);
+ LOG.warn("path isn't a valid local location for SslTransport to use", e);
}
}
SocketFactory socketFactory = createSocketFactory();
@@ -165,8 +165,9 @@
protected ServerSocketFactory createServerSocketFactory() {
if (sslContext == null) {
return SSLServerSocketFactory.getDefault();
- } else
+ } else {
return sslContext.getServerSocketFactory();
+ }
}
/**
@@ -178,8 +179,9 @@
protected SocketFactory createSocketFactory() {
if (sslContext == null) {
return SSLSocketFactory.getDefault();
- } else
+ } else {
return sslContext.getSocketFactory();
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Thu Aug 9 09:37:49 2007
@@ -47,15 +47,17 @@
byte[] buffer = internalBuffer;
count = position = 0;
int n = in.read(buffer, position, buffer.length - position);
- if (n > 0)
+ if (n > 0) {
count = n + position;
+ }
}
public int read() throws IOException {
if (position >= count) {
fill();
- if (position >= count)
+ if (position >= count) {
return -1;
+ }
}
return internalBuffer[position++] & 0xff;
}
@@ -68,8 +70,9 @@
}
fill();
avail = count - position;
- if (avail <= 0)
+ if (avail <= 0) {
return -1;
+ }
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(internalBuffer, position, b, off, cnt);
@@ -86,15 +89,18 @@
int n = 0;
for (;;) {
int nread = readStream(b, off + n, len - n);
- if (nread <= 0)
+ if (nread <= 0) {
return (n == 0) ? nread : n;
+ }
n += nread;
- if (n >= len)
+ if (n >= len) {
return n;
+ }
// if not closed but no bytes available, return
InputStream input = in;
- if (input != null && input.available() <= 0)
+ if (input != null && input.available() <= 0) {
return n;
+ }
}
}
@@ -120,7 +126,8 @@
}
public void close() throws IOException {
- if (in != null)
+ if (in != null) {
in.close();
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Thu Aug 9 09:37:49 2007
@@ -29,7 +29,7 @@
*/
public class TcpBufferedOutputStream extends FilterOutputStream {
- private final static int BUFFER_SIZE = 8192;
+ private static final int BUFFER_SIZE = 8192;
private byte[] buffer;
private int bufferlen;
private int count;
@@ -123,7 +123,7 @@
*
* @throws IOException
*/
- private final void checkClosed() throws IOException {
+ private void checkClosed() throws IOException {
if (closed) {
throw new EOFException("Cannot write to the stream any more it has already been closed");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Aug 9 09:37:49 2007
@@ -49,7 +49,7 @@
* @version $Revision$
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
- private static final Log log = LogFactory.getLog(TcpTransport.class);
+ private static final Log LOG = LogFactory.getLog(TcpTransport.class);
protected final URI remoteLocation;
protected final URI localLocation;
@@ -130,7 +130,7 @@
* reads packets from a Socket
*/
public void run() {
- log.trace("TCP consumer thread starting");
+ LOG.trace("TCP consumer thread starting");
try {
while (!isStopped()) {
doRun();
@@ -284,8 +284,8 @@
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
} catch (SocketException se) {
- log.warn("Cannot set socket buffer size = " + socketBufferSize);
- log.debug("Cannot set socket buffer size. Reason: " + se, se);
+ LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
+ LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
}
sock.setSoTimeout(soTimeout);
@@ -352,8 +352,8 @@
}
protected void doStop(ServiceStopper stopper) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Stopping transport " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping transport " + this);
}
// Closing the streams flush the sockets before closing.. if the socket