You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/19 01:41:02 UTC
svn commit: r1686342 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/...
Author: rgodfrey
Date: Thu Jun 18 23:41:01 2015
New Revision: 1686342
URL: http://svn.apache.org/r1686342
Log:
QPID-6660 : Remove/Refactor unnecessary interfaces/classes
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java (with props)
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
- copied, changed from r1686091, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
Removed:
qpid/java/trunk/client/src/main/java/org/apache/qpid/nclient/
qpid/java/trunk/common/src/main/java/org/apache/qpid/api/
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Binding.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -163,11 +163,6 @@ public class MultiVersionProtocolEngine
_delegate.received(msg);
}
- public void exception(Throwable t)
- {
- _delegate.exception(t);
- }
-
public long getConnectionId()
{
return _delegate.getConnectionId();
@@ -327,11 +322,6 @@ public class MultiVersionProtocolEngine
_logger.error("Error processing incoming data, could not negotiate a common protocol");
}
- public void exception(Throwable t)
- {
- _logger.error("Error establishing session", t);
- }
-
public void closed()
{
@@ -617,11 +607,6 @@ public class MultiVersionProtocolEngine
{
}
- public void exception(Throwable t)
- {
- _logger.error("Error establishing session", t);
- }
-
public void closed()
{
try
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Thu Jun 18 23:41:01 2015
@@ -36,15 +36,15 @@ import org.apache.qpid.configuration.Com
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
public class NonBlockingNetworkTransport
{
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -89,4 +89,5 @@ public interface ProtocolEngine extends
AggregateTicker getAggregateTicker();
void encryptedTransport();
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Jun 18 23:41:01 2015
@@ -1212,11 +1212,6 @@ public class AMQProtocolEngine implement
writeFrame(HeartbeatBody.FRAME);
}
- public void exception(Throwable throwable)
- {
- // noop - exception method is not used by new i/o layer
- }
-
public long getReadBytes()
{
return _readBytes;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java Thu Jun 18 23:41:01 2015
@@ -75,6 +75,8 @@ import org.apache.qpid.transport.network
public class ProtocolEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
{
+ public static Logger LOGGER = LoggerFactory.getLogger(ProtocolEngine_1_0_0.class);
+
public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
private final AmqpPort<?> _port;
private final Transport _transport;
@@ -484,20 +486,16 @@ public class ProtocolEngine_1_0_0 implem
}
catch(RuntimeException e)
{
- exception(e);
+ LOGGER.error("Exception while processing incoming data", e);
+ _network.close();
}
}
- public void exception(Throwable throwable)
- {
- // noop - exception method is not used by new i/o layer
- }
public void closed()
{
try
{
- // todo
try
{
_endpoint.inputClosed();
@@ -512,7 +510,9 @@ public class ProtocolEngine_1_0_0 implem
}
catch(RuntimeException e)
{
- exception(e);
+
+ LOGGER.error("Exception while closing", e);
+ _network.close();
}
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Jun 18 23:41:01 2015
@@ -56,11 +56,10 @@ import org.apache.qpid.jms.ChannelLimitR
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
@@ -139,7 +138,7 @@ public class AMQConnectionDelegate_8_0 i
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
- OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
+ IoNetworkTransport transport = new IoNetworkTransport();
ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
@@ -524,12 +523,12 @@ public class AMQConnectionDelegate_8_0 i
}
- private static class ReceiverClosedWaiter implements ByteBufferReceiver
+ private static class ReceiverClosedWaiter implements ExceptionHandlingByteBufferReceiver
{
private final CountDownLatch _closedWatcher;
- private final ByteBufferReceiver _receiver;
+ private final ExceptionHandlingByteBufferReceiver _receiver;
- public ReceiverClosedWaiter(ByteBufferReceiver receiver)
+ public ReceiverClosedWaiter(ExceptionHandlingByteBufferReceiver receiver)
{
_receiver = receiver;
_closedWatcher = new CountDownLatch(1);
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jun 18 23:41:01 2015
@@ -68,6 +68,7 @@ import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -118,7 +119,7 @@ import org.apache.qpid.util.BytesDataOut
* held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler implements ByteBufferReceiver, TransportActivity
+public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java Thu Jun 18 23:41:01 2015
@@ -26,7 +26,5 @@ public interface ByteBufferReceiver
{
void received(ByteBuffer msg);
- void exception(Throwable t);
-
void closed();
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Jun 18 23:41:01 2015
@@ -40,15 +40,13 @@ import java.util.concurrent.atomic.Atomi
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
@@ -244,10 +242,10 @@ public class Connection extends Connecti
securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
- OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+ IoNetworkTransport transport = new IoNetworkTransport();
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
- ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
+ ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java?rev=1686342&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java Thu Jun 18 23:41:01 2015
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+public interface ExceptionHandlingByteBufferReceiver extends ByteBufferReceiver
+{
+ void exception(Throwable t);
+}
Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ExceptionHandlingByteBufferReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Jun 18 23:41:01 2015
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
*/
public class ServerDelegate extends ConnectionDelegate
{
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
private List<Object> _locales;
@@ -145,7 +146,7 @@ public class ServerDelegate extends Conn
protected int getFrameMax()
{
- return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE;
+ return MAX_FRAME_SIZE;
}
protected void secure(final Connection conn, final byte[] response)
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Thu Jun 18 23:41:01 2015
@@ -31,6 +31,7 @@ import java.nio.ByteOrder;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
@@ -44,7 +45,7 @@ import org.apache.qpid.transport.Segment
* @author Rafael H. Schloming
*/
-public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
+public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
{
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Thu Jun 18 23:41:01 2015
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
@@ -50,7 +51,7 @@ public class IoNetworkConnection impleme
private final Object _lock = new Object();
private Certificate _certificate;
- public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
+ public IoNetworkConnection(Socket socket, ExceptionHandlingByteBufferReceiver delegate,
int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Thu Jun 18 23:41:01 2015
@@ -20,17 +20,31 @@
*/
package org.apache.qpid.transport.network.io;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.Socket;
-import org.apache.qpid.transport.ByteBufferReceiver;
+import org.slf4j.LoggerFactory;
-public class IoNetworkTransport extends AbstractNetworkTransport
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class IoNetworkTransport
{
- @Override
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private NetworkConnection _connection;
+
protected IoNetworkConnection createNetworkConnection(final Socket socket,
- final ByteBufferReceiver engine,
+ final ExceptionHandlingByteBufferReceiver engine,
final Integer sendBufferSize,
final Integer receiveBufferSize,
final int timeout,
@@ -40,4 +54,72 @@ public class IoNetworkTransport extends
ticker);
}
+ public NetworkConnection connect(ConnectionSettings settings,
+ ExceptionHandlingByteBufferReceiver delegate,
+ TransportActivity transportActivity)
+ {
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
+ final Socket socket;
+ try
+ {
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(settings.isTcpNodelay());
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + socket.getTcpNoDelay());
+ }
+
+ InetAddress address = InetAddress.getByName(settings.getHost());
+
+ socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ try
+ {
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = createNetworkConnection(socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
+
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Jun 18 23:41:01 2015
@@ -32,6 +32,7 @@ import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
@@ -47,7 +48,7 @@ final class IoReceiver implements Runnab
private static final Logger log = Logger.get(IoReceiver.class);
- private final ByteBufferReceiver receiver;
+ private final ExceptionHandlingByteBufferReceiver receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
@@ -61,7 +62,7 @@ final class IoReceiver implements Runnab
shutdownBroken = SystemUtils.isWindows();
}
- public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
+ public IoReceiver(Socket socket, ExceptionHandlingByteBufferReceiver receiver, int bufferSize, long timeout)
{
this.receiver = receiver;
this.bufferSize = bufferSize;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Thu Jun 18 23:41:01 2015
@@ -22,12 +22,13 @@ package org.apache.qpid.transport.networ
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
public interface SecurityLayer
{
public ByteBufferSender sender(ByteBufferSender delegate);
- public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
+ public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate);
public String getUserID();
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java Thu Jun 18 23:41:01 2015
@@ -29,6 +29,7 @@ import org.apache.qpid.ssl.SSLContextFac
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -117,7 +118,7 @@ public class SecurityLayerFactory
return sender;
}
- public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
+ public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
{
SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
receiver.setHostname(_hostname);
@@ -147,7 +148,7 @@ public class SecurityLayerFactory
return sender;
}
- public SASLReceiver receiver(ByteBufferReceiver delegate)
+ public SASLReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
{
SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
return receiver;
@@ -174,7 +175,7 @@ public class SecurityLayerFactory
return delegate;
}
- public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
+ public ExceptionHandlingByteBufferReceiver receiver(ExceptionHandlingByteBufferReceiver delegate)
{
return delegate;
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java Thu Jun 18 23:41:01 2015
@@ -26,16 +26,18 @@ import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.util.Logger;
-public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
+public class SASLReceiver extends SASLEncryptor implements ExceptionHandlingByteBufferReceiver
+{
- private ByteBufferReceiver delegate;
+ private ExceptionHandlingByteBufferReceiver delegate;
private byte[] netData;
private static final Logger log = Logger.get(SASLReceiver.class);
- public SASLReceiver(ByteBufferReceiver delegate)
+ public SASLReceiver(ExceptionHandlingByteBufferReceiver delegate)
{
this.delegate = delegate;
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java Thu Jun 18 23:41:01 2015
@@ -29,15 +29,16 @@ import javax.net.ssl.SSLEngineResult.Sta
import javax.net.ssl.SSLException;
import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
-public class SSLReceiver implements ByteBufferReceiver
+public class SSLReceiver implements ExceptionHandlingByteBufferReceiver
{
private static final Logger log = Logger.get(SSLReceiver.class);
- private final ByteBufferReceiver delegate;
+ private final ExceptionHandlingByteBufferReceiver delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer localBuffer;
@@ -47,7 +48,7 @@ public class SSLReceiver implements Byte
private String _hostname;
- public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus)
+ public SSLReceiver(final SSLEngine engine, final ExceptionHandlingByteBufferReceiver delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
Copied: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java (from r1686091, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java?p2=qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java&r1=1686091&r2=1686342&rev=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java Thu Jun 18 23:41:01 2015
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -36,10 +35,9 @@ import org.apache.qpid.transport.network
*/
public abstract class ConnectionBinding
- implements Binding<Connection>
{
- public static Binding<Connection> get(final Connection connection)
+ public static ConnectionBinding get(final Connection connection)
{
return new ConnectionBinding()
{
@@ -50,7 +48,7 @@ public abstract class ConnectionBinding
};
}
- public static Binding<Connection> get(final ConnectionDelegate delegate)
+ public static ConnectionBinding get(final ConnectionDelegate delegate)
{
return new ConnectionBinding()
{
@@ -63,8 +61,6 @@ public abstract class ConnectionBinding
};
}
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
public abstract Connection connection();
public Connection endpoint(ByteBufferSender sender)
@@ -85,7 +81,7 @@ public abstract class ConnectionBinding
return conn;
}
- public ByteBufferReceiver receiver(Connection conn)
+ public ExceptionHandlingByteBufferReceiver receiver(Connection conn)
{
final InputHandler inputHandler = new InputHandler(new Assembler(conn));
conn.addFrameSizeObserver(inputHandler);
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java Thu Jun 18 23:41:01 2015
@@ -29,7 +29,7 @@ import java.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.network.ConnectionBinding;
/**
@@ -37,16 +37,16 @@ import org.apache.qpid.transport.Binding
*
*/
-public class IoAcceptor<E> extends Thread
+public class IoAcceptor extends Thread
{
private static final Logger _logger = LoggerFactory.getLogger(IoAcceptor.class);
private volatile boolean _closed = false;
private ServerSocket socket;
- private Binding<E> binding;
+ private ConnectionBinding binding;
- public IoAcceptor(SocketAddress address, Binding<E> binding)
+ public IoAcceptor(SocketAddress address, ConnectionBinding binding)
throws IOException
{
socket = new ServerSocket();
@@ -70,7 +70,7 @@ public class IoAcceptor<E> extends Threa
}
}
- public IoAcceptor(String host, int port, Binding<E> binding)
+ public IoAcceptor(String host, int port, ConnectionBinding binding)
throws IOException
{
this(new InetSocketAddress(host, port), binding);
@@ -83,7 +83,7 @@ public class IoAcceptor<E> extends Threa
try
{
Socket sock = socket.accept();
- IoTransport<E> transport = new IoTransport<E>(sock, binding);
+ IoTransport transport = new IoTransport(sock, binding);
}
catch (IOException e)
{
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Jun 18 23:41:01 2015
@@ -21,8 +21,9 @@ package org.apache.qpid.transport.networ
import java.net.Socket;
-import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.util.Logger;
/**
@@ -34,7 +35,7 @@ import org.apache.qpid.transport.util.Lo
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport<E>
+public final class IoTransport
{
@@ -48,17 +49,17 @@ public final class IoTransport<E>
private Socket socket;
private ByteBufferSender sender;
- private E endpoint;
+ private Connection endpoint;
private IoReceiver receiver;
private long timeout = 60000;
- IoTransport(Socket socket, Binding<E> binding)
+ IoTransport(Socket socket, ConnectionBinding binding)
{
this.socket = socket;
setupTransport(socket, binding);
}
- private void setupTransport(Socket socket, Binding<E> binding)
+ private void setupTransport(Socket socket, ConnectionBinding binding)
{
IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
ios.initiate();
Modified: qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml?rev=1686342&r1=1686341&r2=1686342&view=diff
==============================================================================
--- qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml (original)
+++ qpid/java/trunk/doc/book/src/jms-client-0-8/JMS-Client-System-Properties.xml Thu Jun 18 23:41:01 2015
@@ -383,14 +383,6 @@
</row>
</thead>
<tbody>
- <row>
- <entry>qpid.transport</entry>
- <entry>string</entry>
- <entry>org.apache.qpid.transport.network.io.IoNetworkTransport</entry>
- <entry><para>The transport implementation to be used.</para><para>A user could
- specify an alternative transport mechanism that implements the interface
- <varname>org.apache.qpid.transport.network.OutgoingNetworkTransport</varname>.</para></entry>
- </row>
<row id="JMS-Client-0-8-System-Properties-SyncOpTimeout">
<entry>qpid.sync_op_timeout</entry>
<entry>long</entry>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org