You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/16 12:41:14 UTC
svn commit: r1685756 [1/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/plugin/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/test/java/...
Author: rgodfrey
Date: Tue Jun 16 10:41:13 2015
New Revision: 1685756
URL: http://svn.apache.org/r1685756
Log:
QPID-6429 : [Java Broker] Move ProtocolEngine from common to broker-core and collapse ServerProtocolEngine and ProtocolEngine into a signle interface (work by Lorenz Quack & Rob Godfrey)
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
- copied, changed from r1685649, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java
- copied, changed from r1685649, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
- copied, changed from r1685649, qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngineFactory.java
- copied, changed from r1685649, qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java Tue Jun 16 10:41:13 2015
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -31,11 +31,12 @@ public interface ProtocolEngineCreator e
{
Protocol getVersion();
byte[] getHeaderIdentifier();
- ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker);
+ ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id,
+ final AggregateTicker aggregateTicker);
byte[] getSuggestedAlternativeHeader();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Tue Jun 16 10:41:13 2015
@@ -31,6 +31,7 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.util.Deletable;
public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T>
@@ -110,7 +111,7 @@ public interface AMQConnectionModel<T ex
boolean isMessageAssignmentSuspended();
- ServerProtocolEngine getProtocolEngine();
+ ProtocolEngine getProtocolEngine();
void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (from r1685649, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java&r1=1685649&r2=1685756&rev=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Tue Jun 16 10:41:13 2015
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.transport;
import java.net.InetSocketAddress;
@@ -46,7 +46,7 @@ import org.apache.qpid.transport.ByteBuf
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
-public class MultiVersionProtocolEngine implements ServerProtocolEngine
+public class MultiVersionProtocolEngine implements ProtocolEngine
{
private static final Logger _logger = LoggerFactory.getLogger(MultiVersionProtocolEngine.class);
@@ -63,8 +63,8 @@ public class MultiVersionProtocolEngine
private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
- private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+ private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private final AggregateTicker _aggregateTicker = new AggregateTicker();
public MultiVersionProtocolEngine(final Broker<?> broker,
@@ -239,7 +239,7 @@ public class MultiVersionProtocolEngine
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
_delegate.setWorkListener(listener);
@@ -257,7 +257,7 @@ public class MultiVersionProtocolEngine
return _aggregateTicker;
}
- private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
{
@Override
@@ -291,7 +291,7 @@ public class MultiVersionProtocolEngine
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
}
@@ -400,7 +400,7 @@ public class MultiVersionProtocolEngine
}
- private class SelfDelegateProtocolEngine implements ServerProtocolEngine
+ private class SelfDelegateProtocolEngine implements ProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
private long _lastReadTime = System.currentTimeMillis();
@@ -456,7 +456,7 @@ public class MultiVersionProtocolEngine
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
}
@@ -497,7 +497,7 @@ public class MultiVersionProtocolEngine
_header.get(headerBytes);
- ServerProtocolEngine newDelegate = null;
+ ProtocolEngine newDelegate = null;
byte[] supportedReplyBytes = null;
byte[] defaultSupportedReplyBytes = null;
Protocol supportedReplyVersion = null;
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java (from r1685649, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java&r1=1685649&r2=1685756&rev=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngineFactory.java Tue Jun 16 10:41:13 2015
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.transport;
import java.net.SocketAddress;
import java.util.ArrayList;
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Jun 16 10:41:13 2015
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.port
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
@@ -54,7 +53,7 @@ public class NonBlockingConnection imple
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final ServerProtocolEngine _protocolEngine;
+ private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
private final int _receiveBufferSize;
@@ -69,7 +68,7 @@ public class NonBlockingConnection imple
private final AmqpPort _port;
public NonBlockingConnection(SocketChannel socketChannel,
- ServerProtocolEngine protocolEngine,
+ ProtocolEngine protocolEngine,
int receiveBufferSize,
final Set<TransportEncryption> encryptionSet,
final Runnable onTransportEncryptionAction,
@@ -88,10 +87,10 @@ public class NonBlockingConnection imple
_remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
_port = port;
- protocolEngine.setWorkListener(new Action<ServerProtocolEngine>()
+ protocolEngine.setWorkListener(new Action<ProtocolEngine>()
{
@Override
- public void performAction(final ServerProtocolEngine object)
+ public void performAction(final ProtocolEngine object)
{
_scheduler.wakeup();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Jun 16 10:41:13 2015
@@ -28,17 +28,11 @@ import java.nio.channels.SocketChannel;
import java.util.EnumSet;
import java.util.Set;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -139,9 +133,8 @@ public class NonBlockingNetworkTransport
{
socketChannel = serverSocketChannel.accept();
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
+ final ProtocolEngine engine =
+ _factory.newProtocolEngine(socketChannel.socket().getRemoteSocketAddress());
if(engine != null)
{
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (from r1685649, qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java&r1=1685649&r2=1685756&rev=1685756&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Tue Jun 16 10:41:13 2015
@@ -18,12 +18,16 @@
* under the License.
*
*/
-package org.apache.qpid.protocol;
+package org.apache.qpid.server.transport;
import java.net.SocketAddress;
+import javax.security.auth.Subject;
+
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -55,8 +59,34 @@ public interface ProtocolEngine extends
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
- void encryptedTransport();
+ void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
+
+ /**
+ * Gets the connection ID associated with this ProtocolEngine
+ */
+ long getConnectionId();
+
+ Subject getSubject();
+
+ boolean isTransportBlockedForWriting();
+
+ void setTransportBlockedForWriting(boolean blocked);
+
+ void setMessageAssignmentSuspended(boolean value);
- public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
+ boolean isMessageAssignmentSuspended();
+ void processPending();
+
+ boolean hasWork();
+
+ void clearWork();
+
+ void notifyWork();
+
+ void setWorkListener(Action<ProtocolEngine> listener);
+
+ AggregateTicker getAggregateTicker();
+
+ void encryptedTransport();
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngineFactory.java (from r1685649, qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngineFactory.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngineFactory.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java&r1=1685649&r2=1685756&rev=1685756&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngineFactory.java Tue Jun 16 10:41:13 2015
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.protocol;
+package org.apache.qpid.server.transport;
import java.net.SocketAddress;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Tue Jun 16 10:41:13 2015
@@ -27,7 +27,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.network.TransportEncryption;
class TCPandSSLTransport implements AcceptingTransport
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Jun 16 10:41:13 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -666,7 +666,7 @@ public class MockConsumer implements Con
}
@Override
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
return null;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Tue Jun 16 10:41:13 2015
@@ -20,10 +20,7 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -33,7 +30,6 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Tue Jun 16 10:41:13 2015
@@ -21,18 +21,18 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
- private final ServerProtocolEngine _serverProtocolEngine;
+ private final ProtocolEngine _protocolEngine;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
+ public CreditCreditManager(long bytesCredit, long messageCredit, final ProtocolEngine protocolEngine)
{
- _serverProtocolEngine = serverProtocolEngine;
+ _protocolEngine = protocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
setSuspended(!hasCredit());
@@ -86,12 +86,12 @@ public class CreditCreditManager extends
public synchronized boolean hasCredit()
{
// Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
+ return (_bytesCredit != 0L && _messageCredit != 0L && !_protocolEngine.isTransportBlockedForWriting());
}
public synchronized boolean useCreditForMessage(long msgSize)
{
- if (_serverProtocolEngine.isTransportBlockedForWriting())
+ if (_protocolEngine.isTransportBlockedForWriting())
{
setSuspended(true);
return false;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Tue Jun 16 10:41:13 2015
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -66,11 +66,11 @@ public class ProtocolEngineCreator_0_10
return AMQP_0_10_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
String fqdn = null;
SocketAddress address = network.getLocalAddress();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Tue Jun 16 10:41:13 2015
@@ -32,7 +32,7 @@ import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Consumer;
@@ -48,7 +48,7 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.NetworkConnection;
-public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
+public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
private static final Logger _logger = LoggerFactory.getLogger(ProtocolEngine_0_10.class);
@@ -68,7 +68,7 @@ public class ProtocolEngine_0_10 extend
private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
private final AtomicBoolean _stateChanged = new AtomicBoolean();
- private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
public ProtocolEngine_0_10(ServerConnection conn,
@@ -327,7 +327,7 @@ public class ProtocolEngine_0_10 extend
{
_stateChanged.set(true);
- final Action<ServerProtocolEngine> listener = _workListener.get();
+ final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
listener.performAction(this);
@@ -341,7 +341,7 @@ public class ProtocolEngine_0_10 extend
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Jun 16 10:41:13 2015
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -106,7 +106,7 @@ public class ServerConnection extends Co
private int _messageCompressionThreshold;
private final int _maxMessageSize;
- private ProtocolEngine_0_10 _serverProtocolEngine;
+ private ProtocolEngine_0_10 _protocolEngine;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
@@ -213,20 +213,20 @@ public class ServerConnection extends Co
}
@Override
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
- return _serverProtocolEngine;
+ return _protocolEngine;
}
@Override
public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
- _serverProtocolEngine.setScheduler(networkConnectionScheduler);
+ _protocolEngine.setScheduler(networkConnectionScheduler);
}
public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine)
{
- _serverProtocolEngine = serverProtocolEngine;
+ _protocolEngine = serverProtocolEngine;
}
public VirtualHostImpl<?,?,?> getVirtualHost()
@@ -756,14 +756,14 @@ public class ServerConnection extends Co
@Override
public void notifyWork()
{
- _serverProtocolEngine.notifyWork();
+ _protocolEngine.notifyWork();
}
@Override
public boolean isMessageAssignmentSuspended()
{
- return _serverProtocolEngine.isMessageAssignmentSuspended();
+ return _protocolEngine.isMessageAssignmentSuspended();
}
public void processPending()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Jun 16 10:41:13 2015
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.store.MessageHandle;
@@ -255,8 +255,8 @@ public class ServerSessionDelegate exten
}
else
{
- ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
+ ProtocolEngine protocolEngine = getServerConnection(session).getProtocolEngine();
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine);
FilterManager filterManager = null;
try
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Tue Jun 16 10:41:13 2015
@@ -24,13 +24,13 @@ package org.apache.qpid.server.protocol.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = LoggerFactory.getLogger(WindowCreditManager.class);
- private final ServerProtocolEngine _serverProtocolEngine;
+ private final ProtocolEngine _protocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -40,9 +40,9 @@ public class WindowCreditManager extends
public WindowCreditManager(long bytesCreditLimit,
long messageCreditLimit,
- ServerProtocolEngine serverProtocolEngine)
+ ProtocolEngine protocolEngine)
{
- _serverProtocolEngine = serverProtocolEngine;
+ _protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
setSuspended(!hasCredit());
@@ -110,12 +110,12 @@ public class WindowCreditManager extends
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
&& (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
- && !_serverProtocolEngine.isTransportBlockedForWriting();
+ && !_protocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if (_serverProtocolEngine.isTransportBlockedForWriting())
+ if (_protocolEngine.isTransportBlockedForWriting())
{
setSuspended(true);
return false;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Tue Jun 16 10:41:13 2015
@@ -28,8 +28,6 @@ import java.util.List;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java Tue Jun 16 10:41:13 2015
@@ -23,19 +23,19 @@ package org.apache.qpid.server.protocol.
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
public class WindowCreditManagerTest extends QpidTestCase
{
private WindowCreditManager _creditManager;
- private ServerProtocolEngine _protocolEngine;
+ private ProtocolEngine _protocolEngine;
protected void setUp() throws Exception
{
super.setUp();
- _protocolEngine = mock(ServerProtocolEngine.class);
+ _protocolEngine = mock(ProtocolEngine.class);
when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
_creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Jun 16 10:41:13 2015
@@ -61,7 +61,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -98,7 +98,7 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ServerProtocolEngine,
+public class AMQProtocolEngine implements ProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
@@ -130,7 +130,7 @@ public class AMQProtocolEngine implement
private final AmqpPort<?> _port;
private final long _creationTime;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
- private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private AMQShortString _contextKey;
@@ -275,7 +275,7 @@ public class AMQProtocolEngine implement
}
@Override
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
return this;
}
@@ -2068,7 +2068,7 @@ public class AMQProtocolEngine implement
{
_stateChanged.set(true);
- final Action<ServerProtocolEngine> listener = _workListener.get();
+ final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
@@ -2083,7 +2083,7 @@ public class AMQProtocolEngine implement
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java Tue Jun 16 10:41:13 2015
@@ -19,16 +19,16 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class NoAckCreditManager extends AbstractFlowCreditManager
{
- private final ServerProtocolEngine _serverProtocolEngine;
+ private final ProtocolEngine _protocolEngine;
- public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+ public NoAckCreditManager(ProtocolEngine protocolEngine)
{
- _serverProtocolEngine = serverProtocolEngine;
+ _protocolEngine = protocolEngine;
}
@Override
@@ -40,7 +40,7 @@ public class NoAckCreditManager extends
@Override
public boolean hasCredit()
{
- return !_serverProtocolEngine.isTransportBlockedForWriting();
+ return !_protocolEngine.isTransportBlockedForWriting();
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java Tue Jun 16 10:41:13 2015
@@ -21,14 +21,14 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
- private final ServerProtocolEngine _protocolEngine;
+ private final ProtocolEngine _protocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -37,7 +37,7 @@ public class Pre0_10CreditManager extend
public Pre0_10CreditManager(long bytesCreditLimit,
long messageCreditLimit,
- ServerProtocolEngine protocolEngine)
+ ProtocolEngine protocolEngine)
{
_protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java Tue Jun 16 10:41:13 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -59,11 +59,11 @@ public class ProtocolEngineCreator_0_8 i
return AMQP_0_8_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
return new AMQProtocolEngine(broker, network, id, port, transport, aggregateTicker);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java Tue Jun 16 10:41:13 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -59,11 +59,11 @@ public class ProtocolEngineCreator_0_9 i
return AMQP_0_9_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
return new AMQProtocolEngine(broker, network, id, port, transport, aggregateTicker);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java Tue Jun 16 10:41:13 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -60,11 +60,11 @@ public class ProtocolEngineCreator_0_9_1
return AMQP_0_9_1_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
return new AMQProtocolEngine(broker, network, id, port, transport, aggregateTicker);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Jun 16 10:41:13 2015
@@ -54,13 +54,12 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
-import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -424,7 +423,7 @@ public class Connection_1_0 implements C
}
@Override
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
return _protocolEngine;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Jun 16 10:41:13 2015
@@ -43,7 +43,7 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -297,7 +297,7 @@ class ConsumerTarget_1_0 extends Abstrac
synchronized (_link.getLock())
{
- ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ ProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
@@ -335,7 +335,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
synchronized(_link.getLock())
{
- ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ ProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java Tue Jun 16 10:41:13 2015
@@ -33,7 +33,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -70,11 +70,11 @@ public class ProtocolEngineCreator_1_0_0
return AMQP_1_0_0_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
final AuthenticationProvider<?> authenticationProvider = port.getAuthenticationProvider();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java Tue Jun 16 10:41:13 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
@@ -59,11 +59,11 @@ public class ProtocolEngineCreator_1_0_0
return AMQP_SASL_1_0_0_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
- NetworkConnection network,
- AmqpPort<?> port,
- Transport transport,
- long id, final AggregateTicker aggregateTicker)
+ public ProtocolEngine newProtocolEngine(Broker<?> broker,
+ NetworkConnection network,
+ AmqpPort<?> port,
+ Transport transport,
+ long id, final AggregateTicker aggregateTicker)
{
return new ProtocolEngine_1_0_0(network, broker, id, port, transport, aggregateTicker, true);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java Tue Jun 16 10:41:13 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.amqp_1_0.type.Sym
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Consumer;
@@ -72,7 +72,7 @@ import org.apache.qpid.transport.ByteBuf
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
-public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
+public class ProtocolEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
{
public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
@@ -90,7 +90,7 @@ public class ProtocolEngine_1_0_0 implem
private ConnectionEndpoint _endpoint;
private long _connectionId;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
- private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private static final ByteBuffer SASL_LAYER_HEADER =
@@ -653,7 +653,7 @@ public class ProtocolEngine_1_0_0 implem
{
_stateChanged.set(true);
- final Action<ServerProtocolEngine> listener = _workListener.get();
+ final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
listener.performAction(this);
@@ -667,7 +667,7 @@ public class ProtocolEngine_1_0_0 implem
}
@Override
- public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
}
Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Tue Jun 16 10:41:13 2015
@@ -45,13 +45,13 @@ import org.eclipse.jetty.util.ssl.SslCon
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ProtocolEngineFactory;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jun 16 10:41:13 2015
@@ -64,12 +64,13 @@ import org.apache.qpid.framing.ProtocolV
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.util.BytesDataOutput;
/**
@@ -117,7 +118,7 @@ import org.apache.qpid.util.BytesDataOut
* held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler implements ProtocolEngine
+public class AMQProtocolHandler implements ByteBufferReceiver, TransportActivity
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -316,11 +317,6 @@ public class AMQProtocolHandler implemen
}
}
- @Override
- public void encryptedTransport()
- {
- }
-
public void readerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java Tue Jun 16 10:41:13 2015
@@ -20,18 +20,13 @@
*/
package org.apache.qpid.client.transport;
-import java.net.BindException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.NetworkConnection;
/**
@@ -53,13 +48,6 @@ public class TestNetworkConnection imple
}
-
- public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
- {
-
- }
-
public SocketAddress getLocalAddress()
{
return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Transport.java Tue Jun 16 10:41:13 2015
@@ -58,12 +58,6 @@ public class Transport
{
}
- public static IncomingNetworkTransport getIncomingTransportInstance()
- {
- return (IncomingNetworkTransport) loadTransportClass(
- System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, IO_TRANSPORT_CLASSNAME));
- }
-
public static OutgoingNetworkTransport getOutgoingTransportInstance(
final ProtocolVersion protocolVersion)
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Tue Jun 16 10:41:13 2015
@@ -23,42 +23,24 @@ package org.apache.qpid.transport.networ
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketException;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-// TODO we are no longer using the IncomingNetworkTransport
-public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport
{
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
- CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private Socket _socket;
private NetworkConnection _connection;
- private AcceptingThread _acceptor;
public NetworkConnection connect(ConnectionSettings settings,
ByteBufferReceiver delegate,
@@ -67,28 +49,25 @@ public abstract class AbstractNetworkTra
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
+ final Socket socket;
try
{
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(settings.isTcpNodelay());
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
if(LOGGER.isDebugEnabled())
{
- LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ LOGGER.debug("SO_RCVBUF : " + socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + socket.getTcpNoDelay());
}
InetAddress address = InetAddress.getByName(settings.getHost());
- _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
+ socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
}
catch (IOException e)
{
@@ -98,7 +77,7 @@ public abstract class AbstractNetworkTra
try
{
IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ _connection = createNetworkConnection(socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
ticker.setConnection(_connection);
_connection.start();
}
@@ -106,7 +85,7 @@ public abstract class AbstractNetworkTra
{
try
{
- _socket.close();
+ socket.close();
}
catch(IOException ioe)
{
@@ -125,10 +104,6 @@ public abstract class AbstractNetworkTra
{
_connection.close();
}
- if(_acceptor != null)
- {
- _acceptor.close();
- }
}
public NetworkConnection getConnection()
@@ -136,27 +111,6 @@ public abstract class AbstractNetworkTra
return _connection;
}
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
- {
- try
- {
- _acceptor = new AcceptingThread(config, factory, sslContext);
- _acceptor.setDaemon(false);
- _acceptor.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
- }
-
- public int getAcceptingPort()
- {
- return _acceptor == null ? -1 : _acceptor.getPort();
- }
-
protected abstract NetworkConnection createNetworkConnection(Socket socket,
ByteBufferReceiver engine,
Integer sendBufferSize,
@@ -164,179 +118,4 @@ public abstract class AbstractNetworkTra
int timeout,
IdleTimeoutTicker ticker);
- private class AcceptingThread extends Thread
- {
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocket _serverSocket;
- private int _timeout;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext) throws IOException
- {
- _config = config;
- _factory = factory;
- _sslContext = sslContext;
- _timeout = TIMEOUT;
-
- InetSocketAddress address = config.getAddress();
-
- if(sslContext == null)
- {
- _serverSocket = new ServerSocket();
- }
- else
- {
- SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
- _serverSocket = socketFactory.createServerSocket();
-
- SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
- SSLUtil.removeSSLv3Support(sslServerSocket);
-
- if(config.needClientAuth())
- {
- sslServerSocket.setNeedClientAuth(true);
- }
- else if(config.wantClientAuth())
- {
- sslServerSocket.setWantClientAuth(true);
- }
-
- }
-
- _serverSocket.setReuseAddress(true);
- _serverSocket.bind(address);
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- LOGGER.debug("Shutting down the Acceptor");
- _closed = true;
-
- if (!_serverSocket.isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- }
-
- private int getPort()
- {
- return _serverSocket.getLocalPort();
- }
-
- @Override
- public void run()
- {
- try
- {
- while (!_closed)
- {
- Socket socket = null;
- try
- {
- socket = _serverSocket.accept();
-
- ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
- if(engine != null)
- {
- socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-
- NetworkConnection connection =
- createNetworkConnection(socket,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker);
-
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- engine.setNetworkConnection(connection, connection.getSender());
-
- connection.start();
- }
- else
- {
- socket.close();
- }
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- }
- catch(IOException e)
- {
- if(!_closed)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- try
- {
- //Delay to avoid tight spinning the loop during issues such as too many open files
- Thread.sleep(1000);
- }
- catch (InterruptedException ie)
- {
- LOGGER.debug("Stopping acceptor due to interrupt request");
- _closed = true;
- }
- }
- }
- }
- }
- finally
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
- + _config.getAddress());
- }
- }
- }
-
- private void closeSocketIfNecessary(final Socket socket)
- {
- if(socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.debug("Exception while closing socket", e);
- }
- }
- }
-
- }
}
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?rev=1685756&r1=1685755&r2=1685756&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Tue Jun 16 10:41:13 2015
@@ -21,16 +21,10 @@
package org.apache.qpid.transport.network;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
@@ -71,22 +65,6 @@ public class TransportTest extends QpidT
assertTrue(networkTransport instanceof IoNetworkTransport);
}
- public void testDefaultGetIncomingTransport() throws Exception
- {
- final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
- assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof IoNetworkTransport);
- }
-
- public void testOverriddenGetIncomingTransport() throws Exception
- {
- setTestSystemProperty(Transport.QPID_BROKER_TRANSPORT_PROPNAME, TestIncomingNetworkTransport.class.getName());
-
- final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
- assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof TestIncomingNetworkTransport);
- }
-
public void testInvalidOutgoingTransportClassName() throws Exception
{
setTestSystemProperty(Transport.QPID_TRANSPORT_PROPNAME, "invalid");
@@ -136,31 +114,4 @@ public class TransportTest extends QpidT
}
}
- public static class TestIncomingNetworkTransport implements IncomingNetworkTransport
- {
-
- public void close()
- {
- throw new UnsupportedOperationException();
- }
-
- public NetworkConnection getConnection()
- {
- throw new UnsupportedOperationException();
- }
-
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext,
- final Set<TransportEncryption> encryptionSet)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getAcceptingPort()
- {
- return -1;
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org