You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/16 18:13:21 UTC
svn commit: r754959 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
Author: chirino
Date: Mon Mar 16 17:13:20 2009
New Revision: 754959
URL: http://svn.apache.org/viewvc?rev=754959&view=rev
Log:
Allow the negociation protocol to be used when not inserted as a transport filter.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=754959&r1=754958&r2=754959&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Mon Mar 16 17:13:20 2009
@@ -71,15 +71,19 @@
public void start() throws Exception {
super.start();
if (firstStart.compareAndSet(true, false)) {
- try {
- WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending: " + info);
- }
- sendWireFormat(info);
- } finally {
- wireInfoSentDownLatch.countDown();
+ sendWireFormat();
+ }
+ }
+
+ public void sendWireFormat() throws IOException {
+ try {
+ WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending: " + info);
}
+ sendWireFormat(info);
+ } finally {
+ wireInfoSentDownLatch.countDown();
}
}
@@ -104,43 +108,47 @@
Command command = (Command)o;
if (command.isWireFormatInfo()) {
WireFormatInfo info = (WireFormatInfo)command;
+ negociate(info);
+ }
+ getTransportListener().onCommand(command);
+ }
+
+ public void negociate(WireFormatInfo info) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received WireFormat: " + info);
+ }
+
+ try {
+ wireInfoSentDownLatch.await();
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Received WireFormat: " + info);
+ LOG.debug(this + " before negotiation: " + wireFormat);
+ }
+ if (!info.isValid()) {
+ onException(new IOException("Remote wire format magic is invalid"));
+ } else if (info.getVersion() < minimumVersion) {
+ onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
}
- try {
- wireInfoSentDownLatch.await();
+ wireFormat.renegotiateWireFormat(info);
+ Socket socket = next.narrow(Socket.class);
+ if (socket != null) {
+ socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " before negotiation: " + wireFormat);
- }
- if (!info.isValid()) {
- onException(new IOException("Remote wire format magic is invalid"));
- } else if (info.getVersion() < minimumVersion) {
- onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
- }
-
- wireFormat.renegotiateWireFormat(info);
- Socket socket = next.narrow(Socket.class);
- if (socket != null) {
- socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " after negotiation: " + wireFormat);
- }
-
- } catch (IOException e) {
- onException(e);
- } catch (InterruptedException e) {
- onException((IOException)new InterruptedIOException().initCause(e));
- } catch (Exception e) {
- onException(IOExceptionSupport.create(e));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + " after negotiation: " + wireFormat);
}
- readyCountDownLatch.countDown();
- onWireFormatNegotiated(info);
+
+ } catch (IOException e) {
+ onException(e);
+ } catch (InterruptedException e) {
+ onException((IOException)new InterruptedIOException().initCause(e));
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
}
- getTransportListener().onCommand(command);
+ readyCountDownLatch.countDown();
+ onWireFormatNegotiated(info);
}
public void onException(IOException error) {