You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/16 18:13:21 UTC

svn commit: r754959 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java

Author: chirino
Date: Mon Mar 16 17:13:20 2009
New Revision: 754959

URL: http://svn.apache.org/viewvc?rev=754959&view=rev
Log:
Allow the negociation protocol to be used when not inserted as a transport filter.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=754959&r1=754958&r2=754959&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Mon Mar 16 17:13:20 2009
@@ -71,15 +71,19 @@
     public void start() throws Exception {
         super.start();
         if (firstStart.compareAndSet(true, false)) {
-            try {
-                WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sending: " + info);
-                }
-                sendWireFormat(info);
-            } finally {
-                wireInfoSentDownLatch.countDown();
+            sendWireFormat();
+        }
+    }
+
+    public void sendWireFormat() throws IOException {
+        try {
+            WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending: " + info);
             }
+            sendWireFormat(info);
+        } finally {
+            wireInfoSentDownLatch.countDown();
         }
     }
 
@@ -104,43 +108,47 @@
         Command command = (Command)o;
         if (command.isWireFormatInfo()) {
             WireFormatInfo info = (WireFormatInfo)command;
+            negociate(info);
+        }
+        getTransportListener().onCommand(command);
+    }
+
+    public void negociate(WireFormatInfo info) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received WireFormat: " + info);
+        }
+
+        try {
+            wireInfoSentDownLatch.await();
+
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Received WireFormat: " + info);
+                LOG.debug(this + " before negotiation: " + wireFormat);
+            }
+            if (!info.isValid()) {
+                onException(new IOException("Remote wire format magic is invalid"));
+            } else if (info.getVersion() < minimumVersion) {
+                onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
             }
 
-            try {
-                wireInfoSentDownLatch.await();
+            wireFormat.renegotiateWireFormat(info);
+            Socket socket = next.narrow(Socket.class);
+            if (socket != null) {
+                socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
+            }
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(this + " before negotiation: " + wireFormat);
-                }
-                if (!info.isValid()) {
-                    onException(new IOException("Remote wire format magic is invalid"));
-                } else if (info.getVersion() < minimumVersion) {
-                    onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
-                }
-
-                wireFormat.renegotiateWireFormat(info);
-                Socket socket = next.narrow(Socket.class);
-                if (socket != null) {
-                    socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(this + " after negotiation: " + wireFormat);
-                }
-
-            } catch (IOException e) {
-                onException(e);
-            } catch (InterruptedException e) {
-                onException((IOException)new InterruptedIOException().initCause(e));
-            } catch (Exception e) {
-                onException(IOExceptionSupport.create(e));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(this + " after negotiation: " + wireFormat);
             }
-            readyCountDownLatch.countDown();
-            onWireFormatNegotiated(info);
+
+        } catch (IOException e) {
+            onException(e);
+        } catch (InterruptedException e) {
+            onException((IOException)new InterruptedIOException().initCause(e));
+        } catch (Exception e) {
+            onException(IOExceptionSupport.create(e));
         }
-        getTransportListener().onCommand(command);
+        readyCountDownLatch.countDown();
+        onWireFormatNegotiated(info);
     }
 
     public void onException(IOException error) {