You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/19 01:41:02 UTC

svn commit: r1686342 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/...

Author: rgodfrey
Date: Thu Jun 18 23:41:01 2015
New Revision: 1686342

URL: http://svn.apache.org/r1686342
Log:
QPID-6660 : Remove/Refactor unnecessary interfaces/classes

Added:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java   (with props)
    qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
      - copied, changed from r1686091, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
Removed:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/nclient/
    qpid/java/trunk/common/src/main/java/org/apache/qpid/api/
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Binding.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
    qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -163,11 +163,6 @@ public class MultiVersionProtocolEngine
         _delegate.received(msg);
     }
 
-    public void exception(Throwable t)
-    {
-        _delegate.exception(t);
-    }
-
     public long getConnectionId()
     {
         return _delegate.getConnectionId();
@@ -327,11 +322,6 @@ public class MultiVersionProtocolEngine
             _logger.error("Error processing incoming data, could not negotiate a common protocol");
         }
 
-        public void exception(Throwable t)
-        {
-            _logger.error("Error establishing session", t);
-        }
-
         public void closed()
         {
 
@@ -617,11 +607,6 @@ public class MultiVersionProtocolEngine
         {
         }
 
-        public void exception(Throwable t)
-        {
-            _logger.error("Error establishing session", t);
-        }
-
         public void closed()
         {
             try

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Thu Jun 18 23:41:01 2015
@@ -36,15 +36,15 @@ import org.apache.qpid.configuration.Com
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
 import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 public class NonBlockingNetworkTransport
 {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -89,4 +89,5 @@ public interface ProtocolEngine extends
    AggregateTicker getAggregateTicker();
 
    void encryptedTransport();
+
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -1212,11 +1212,6 @@ public class AMQProtocolEngine implement
         writeFrame(HeartbeatBody.FRAME);
     }
 
-    public void exception(Throwable throwable)
-    {
-        // noop - exception method is not used by new i/o layer
-    }
-
     public long getReadBytes()
     {
         return _readBytes;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java Thu Jun 18 23:41:01 2015
@@ -75,6 +75,8 @@ import org.apache.qpid.transport.network
 public class ProtocolEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
 {
 
+    public static Logger LOGGER = LoggerFactory.getLogger(ProtocolEngine_1_0_0.class);
+
     public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
     private final AmqpPort<?> _port;
     private final Transport _transport;
@@ -484,20 +486,16 @@ public class ProtocolEngine_1_0_0 implem
         }
         catch(RuntimeException e)
         {
-            exception(e);
+            LOGGER.error("Exception while processing incoming data", e);
+            _network.close();
         }
      }
 
-    public void exception(Throwable throwable)
-    {
-        // noop - exception method is not used by new i/o layer
-    }
 
     public void closed()
     {
         try
         {
-            // todo
             try
             {
                 _endpoint.inputClosed();
@@ -512,7 +510,9 @@ public class ProtocolEngine_1_0_0 implem
         }
         catch(RuntimeException e)
         {
-            exception(e);
+
+            LOGGER.error("Exception while closing", e);
+            _network.close();
         }
     }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Jun 18 23:41:01 2015
@@ -56,11 +56,10 @@ import org.apache.qpid.jms.ChannelLimitR
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
 import org.apache.qpid.transport.network.security.SecurityLayer;
 import org.apache.qpid.transport.network.security.SecurityLayerFactory;
 
@@ -139,7 +138,7 @@ public class AMQConnectionDelegate_8_0 i
 
         SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
 
-        OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
+        IoNetworkTransport transport = new IoNetworkTransport();
 
         ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
 
@@ -524,12 +523,12 @@ public class AMQConnectionDelegate_8_0 i
     }
 
 
-    private static class ReceiverClosedWaiter implements ByteBufferReceiver
+    private static class ReceiverClosedWaiter implements ExceptionHandlingByteBufferReceiver
     {
         private final CountDownLatch _closedWatcher;
-        private final ByteBufferReceiver _receiver;
+        private final ExceptionHandlingByteBufferReceiver _receiver;
 
-        public ReceiverClosedWaiter(ByteBufferReceiver receiver)
+        public ReceiverClosedWaiter(ExceptionHandlingByteBufferReceiver receiver)
         {
             _receiver = receiver;
             _closedWatcher = new CountDownLatch(1);

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jun 18 23:41:01 2015
@@ -68,6 +68,7 @@ import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
@@ -118,7 +119,7 @@ import org.apache.qpid.util.BytesDataOut
  * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
  * that lifecycles of the fields match lifecycles of their containing objects.
  */
-public class AMQProtocolHandler implements ByteBufferReceiver, TransportActivity
+public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java Thu Jun 18 23:41:01 2015
@@ -26,7 +26,5 @@ public interface ByteBufferReceiver
 {
     void received(ByteBuffer msg);
 
-    void exception(Throwable t);
-
     void closed();
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Jun 18 23:41:01 2015
@@ -40,15 +40,13 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslServer;
 
-import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
 import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
 import org.apache.qpid.transport.network.security.SecurityLayer;
 import org.apache.qpid.transport.network.security.SecurityLayerFactory;
 import org.apache.qpid.transport.util.Logger;
@@ -244,10 +242,10 @@ public class Connection extends Connecti
 
             securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
 
-            OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+            IoNetworkTransport transport = new IoNetworkTransport();
             final InputHandler inputHandler = new InputHandler(new Assembler(this));
             addFrameSizeObserver(inputHandler);
-            ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
+            ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
             if(secureReceiver instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureReceiver);

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java?rev=1686342&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java Thu Jun 18 23:41:01 2015
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+public interface ExceptionHandlingByteBufferReceiver extends ByteBufferReceiver
+{
+    void exception(Throwable t);
+}

Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Jun 18 23:41:01 2015
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ServerDelegate extends ConnectionDelegate
 {
+    public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
     protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
 
     private List<Object> _locales;
@@ -145,7 +146,7 @@ public class ServerDelegate extends Conn
 
     protected int getFrameMax()
     {
-        return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE;
+        return MAX_FRAME_SIZE;
     }
 
     protected void secure(final Connection conn, final byte[] response)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Thu Jun 18 23:41:01 2015
@@ -31,6 +31,7 @@ import java.nio.ByteOrder;
 
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
@@ -44,7 +45,7 @@ import org.apache.qpid.transport.Segment
  * @author Rafael H. Schloming
  */
 
-public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
+public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
 {
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Thu Jun 18 23:41:01 2015
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
@@ -50,7 +51,7 @@ public class IoNetworkConnection impleme
     private final Object _lock = new Object();
     private Certificate _certificate;
 
-    public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
+    public IoNetworkConnection(Socket socket, ExceptionHandlingByteBufferReceiver delegate,
             int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
     {
         _socket = socket;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Thu Jun 18 23:41:01 2015
@@ -20,17 +20,31 @@
  */
 package org.apache.qpid.transport.network.io;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 
-import org.apache.qpid.transport.ByteBufferReceiver;
+import org.slf4j.LoggerFactory;
 
-public class IoNetworkTransport extends AbstractNetworkTransport
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class IoNetworkTransport
 {
 
 
-    @Override
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+    private NetworkConnection _connection;
+
     protected IoNetworkConnection createNetworkConnection(final Socket socket,
-                                                       final ByteBufferReceiver engine,
+                                                       final ExceptionHandlingByteBufferReceiver engine,
                                                        final Integer sendBufferSize,
                                                        final Integer receiveBufferSize,
                                                        final int timeout,
@@ -40,4 +54,72 @@ public class IoNetworkTransport extends
                                 ticker);
     }
 
+    public NetworkConnection connect(ConnectionSettings settings,
+                                     ExceptionHandlingByteBufferReceiver delegate,
+                                     TransportActivity transportActivity)
+    {
+        int sendBufferSize = settings.getWriteBufferSize();
+        int receiveBufferSize = settings.getReadBufferSize();
+
+        final Socket socket;
+        try
+        {
+            socket = new Socket();
+            socket.setReuseAddress(true);
+            socket.setTcpNoDelay(settings.isTcpNodelay());
+            socket.setSendBufferSize(sendBufferSize);
+            socket.setReceiveBufferSize(receiveBufferSize);
+
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("SO_RCVBUF : " + socket.getReceiveBufferSize());
+                LOGGER.debug("SO_SNDBUF : " + socket.getSendBufferSize());
+                LOGGER.debug("TCP_NODELAY : " + socket.getTcpNoDelay());
+            }
+
+            InetAddress address = InetAddress.getByName(settings.getHost());
+
+            socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+
+        try
+        {
+            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+            _connection = createNetworkConnection(socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+            ticker.setConnection(_connection);
+            _connection.start();
+        }
+        catch(Exception e)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch(IOException ioe)
+            {
+                //ignored, throw based on original exception
+            }
+
+            throw new TransportException("Error creating network connection", e);
+        }
+
+        return _connection;
+    }
+
+    public void close()
+    {
+        if(_connection != null)
+        {
+            _connection.close();
+        }
+    }
+
+    public NetworkConnection getConnection()
+    {
+        return _connection;
+    }
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Jun 18 23:41:01 2015
@@ -32,6 +32,7 @@ import javax.net.ssl.SSLSocket;
 
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.util.Logger;
@@ -47,7 +48,7 @@ final class IoReceiver implements Runnab
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
-    private final ByteBufferReceiver receiver;
+    private final ExceptionHandlingByteBufferReceiver receiver;
     private final int bufferSize;
     private final Socket socket;
     private final long timeout;
@@ -61,7 +62,7 @@ final class IoReceiver implements Runnab
         shutdownBroken = SystemUtils.isWindows();
     }
 
-    public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
+    public IoReceiver(Socket socket, ExceptionHandlingByteBufferReceiver receiver, int bufferSize, long timeout)
     {
         this.receiver = receiver;
         this.bufferSize = bufferSize;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Thu Jun 18 23:41:01 2015
@@ -22,12 +22,13 @@ package org.apache.qpid.transport.networ
 
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 
 public interface SecurityLayer
 {
 
     public ByteBufferSender sender(ByteBufferSender delegate);
-    public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
+    public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate);
     public String getUserID();
 
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java Thu Jun 18 23:41:01 2015
@@ -29,6 +29,7 @@ import org.apache.qpid.ssl.SSLContextFac
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -117,7 +118,7 @@ public class SecurityLayerFactory
             return sender;
         }
 
-        public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
+        public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
         {
             SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
             receiver.setHostname(_hostname);
@@ -147,7 +148,7 @@ public class SecurityLayerFactory
             return sender;
         }
 
-        public SASLReceiver receiver(ByteBufferReceiver delegate)
+        public SASLReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
         {
             SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
             return receiver;
@@ -174,7 +175,7 @@ public class SecurityLayerFactory
             return delegate;
         }
 
-        public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
+        public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
         {
             return delegate;
         }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java Thu Jun 18 23:41:01 2015
@@ -26,16 +26,18 @@ import java.nio.ByteBuffer;
 import javax.security.sasl.SaslException;
 
 import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.util.Logger;
 
-public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
+public class SASLReceiver extends SASLEncryptor implements ExceptionHandlingByteBufferReceiver
+{
 
-    private ByteBufferReceiver delegate;
+    private ExceptionHandlingByteBufferReceiver delegate;
     private byte[] netData;
     private static final Logger log = Logger.get(SASLReceiver.class);
     
-    public SASLReceiver(ByteBufferReceiver delegate)
+    public SASLReceiver(ExceptionHandlingByteBufferReceiver delegate)
     {
         this.delegate = delegate;
     }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java Thu Jun 18 23:41:01 2015
@@ -29,15 +29,16 @@ import javax.net.ssl.SSLEngineResult.Sta
 import javax.net.ssl.SSLException;
 
 import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.security.SSLStatus;
 import org.apache.qpid.transport.util.Logger;
 
-public class SSLReceiver implements ByteBufferReceiver
+public class SSLReceiver implements ExceptionHandlingByteBufferReceiver
 {
     private static final Logger log = Logger.get(SSLReceiver.class);
 
-    private final ByteBufferReceiver delegate;
+    private final ExceptionHandlingByteBufferReceiver delegate;
     private final SSLEngine engine;
     private final int sslBufSize;
     private final ByteBuffer localBuffer;
@@ -47,7 +48,7 @@ public class SSLReceiver implements Byte
 
     private String _hostname;
 
-    public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus)
+    public SSLReceiver(final SSLEngine engine, final ExceptionHandlingByteBufferReceiver delegate, final SSLStatus sslStatus)
     {
         this.engine = engine;
         this.delegate = delegate;

Copied: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java (from r1686091, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java?p2=qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java&r1=1686091&r2=1686342&rev=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java Thu Jun 18 23:41:01 2015
@@ -20,13 +20,12 @@
  */
 package org.apache.qpid.transport.network;
 
-import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
 
@@ -36,10 +35,9 @@ import org.apache.qpid.transport.network
  */
 
 public abstract class ConnectionBinding
-    implements Binding<Connection>
 {
 
-    public static Binding<Connection> get(final Connection connection)
+    public static ConnectionBinding get(final Connection connection)
     {
         return new ConnectionBinding()
         {
@@ -50,7 +48,7 @@ public abstract class ConnectionBinding
         };
     }
 
-    public static Binding<Connection> get(final ConnectionDelegate delegate)
+    public static ConnectionBinding get(final ConnectionDelegate delegate)
     {
         return new ConnectionBinding()
         {
@@ -63,8 +61,6 @@ public abstract class ConnectionBinding
         };
     }
 
-    public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
     public abstract Connection connection();
 
     public Connection endpoint(ByteBufferSender sender)
@@ -85,7 +81,7 @@ public abstract class ConnectionBinding
         return conn;
     }
 
-    public ByteBufferReceiver receiver(Connection conn)
+    public ExceptionHandlingByteBufferReceiver receiver(Connection conn)
     {
         final InputHandler inputHandler = new InputHandler(new Assembler(conn));
         conn.addFrameSizeObserver(inputHandler);

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java Thu Jun 18 23:41:01 2015
@@ -29,7 +29,7 @@ import java.net.SocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.network.ConnectionBinding;
 
 
 /**
@@ -37,16 +37,16 @@ import org.apache.qpid.transport.Binding
  *
  */
 
-public class IoAcceptor<E> extends Thread
+public class IoAcceptor extends Thread
 {
     private static final Logger _logger = LoggerFactory.getLogger(IoAcceptor.class);
 
     private volatile boolean _closed = false;
 
     private ServerSocket socket;
-    private Binding<E> binding;
+    private ConnectionBinding binding;
 
-    public IoAcceptor(SocketAddress address, Binding<E> binding)
+    public IoAcceptor(SocketAddress address, ConnectionBinding binding)
         throws IOException
     {
         socket = new ServerSocket();
@@ -70,7 +70,7 @@ public class IoAcceptor<E> extends Threa
         }
     }
 
-    public IoAcceptor(String host, int port, Binding<E> binding)
+    public IoAcceptor(String host, int port, ConnectionBinding binding)
         throws IOException
     {
         this(new InetSocketAddress(host, port), binding);
@@ -83,7 +83,7 @@ public class IoAcceptor<E> extends Threa
             try
             {
                 Socket sock = socket.accept();
-                IoTransport<E> transport = new IoTransport<E>(sock, binding);
+                IoTransport transport = new IoTransport(sock, binding);
             }
             catch (IOException e)
             {

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Jun 18 23:41:01 2015
@@ -21,8 +21,9 @@ package org.apache.qpid.transport.networ
 
 import java.net.Socket;
 
-import org.apache.qpid.transport.Binding;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.network.ConnectionBinding;
 import org.apache.qpid.transport.util.Logger;
 
 /**
@@ -34,7 +35,7 @@ import org.apache.qpid.transport.util.Lo
  * SO_RCVBUF    - amqj.receiveBufferSize
  * SO_SNDBUF    - amqj.sendBufferSize
  */
-public final class IoTransport<E>
+public final class IoTransport
 {
 
 
@@ -48,17 +49,17 @@ public final class IoTransport<E>
 
     private Socket socket;
     private ByteBufferSender sender;
-    private E endpoint;
+    private Connection endpoint;
     private IoReceiver receiver;
     private long timeout = 60000;
 
-    IoTransport(Socket socket, Binding<E> binding)
+    IoTransport(Socket socket, ConnectionBinding binding)
     {
         this.socket = socket;
         setupTransport(socket, binding);
     }
 
-    private void setupTransport(Socket socket, Binding<E> binding)
+    private void setupTransport(Socket socket, ConnectionBinding binding)
     {
         IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
         ios.initiate();

Modified: qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml (original)
+++ qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml Thu Jun 18 23:41:01 2015
@@ -383,14 +383,6 @@
 				</row>
 			</thead>
 			<tbody>
-				<row>
-					<entry>qpid.transport</entry>
-					<entry>string</entry>
-					<entry>org.apache.qpid.transport.network.io.IoNetworkTransport</entry>
-					<entry><para>The transport implementation to be used.</para><para>A user could
-							specify an alternative transport mechanism that implements the interface
-								<varname>org.apache.qpid.transport.network.OutgoingNetworkTransport</varname>.</para></entry>
-				</row>
 				<row id="JMS-Client-0-8-System-Properties-SyncOpTimeout">
 					<entry>qpid.sync_op_timeout</entry>
 					<entry>long</entry>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org