You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/13 23:15:53 UTC
svn commit: r1782895 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/configuration/
broker-core/src/main/java/org/apache/qpid/server/properties/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/jav...
Author: kwall
Date: Mon Feb 13 23:15:52 2017
New Revision: 1782895
URL: http://svn.apache.org/viewvc?rev=1782895&view=rev
Log:
QPID-7603: [Java Broker] Remove client transport, client threading, and configuration/properties
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Accessor.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/ClientProperties.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Configured.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/QpidProperty.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Validator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/DefaultThreadFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/LoggingUncaughtExceptionHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/QpidThreadExecutor.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/RealtimeThreadFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/ThreadFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/Threading.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ClientDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ConnectionSettings.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/StreamReturnCode.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/Assembler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/Disassembler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/InputHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IdleTimeoutTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoNetworkConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoReceiver.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoSender.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SSLStatus.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SecurityLayer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SecurityLayerFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLEncryptor.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLReceiver.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLSender.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/ssl/SSLReceiver.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/ssl/SSLSender.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/QpidPropertyTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/thread/ThreadFactoryTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/ConnectionSettingsTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/ConnectionTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/SessionTimeoutTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/ConnectionBinding.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IdleTimeoutTickerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IoAcceptor.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IoTransport.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java Mon Feb 13 23:15:52 2017
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
* Qpid build specific information like project name, version number, and source code repository revision number
* are captured by this class and exposed via public static methods.
*
- * @see ClientProperties
*/
public class CommonProperties
{
@@ -52,11 +51,6 @@ public class CommonProperties
public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = "qpid.io_network_transport_timeout";
public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
- public static final String QPID_CLIENT_SECURITY_TLS_PROTOCOL_WHITE_LIST = "qpid.client.security.tls.protocolWhiteList";
- public static final String QPID_CLIENT_SECURITY_TLS_PROTOCOL_BLACK_LIST = "qpid.client.security.tls.protocolBlackList";
- public static final String QPID_CLIENT_SECURITY_TLS_CIPHER_SUITE_WHITE_LIST = "qpid.client.security.tls.cipherSuiteWhiteList";
- public static final String QPID_CLIENT_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST = "qpid.client.security.tls.cipherSuiteBlackList";
-
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST = "qpid.security.tls.protocolWhiteList";
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT = "TLSv1\\.[0-9]+";
public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST = "qpid.security.tls.protocolBlackList";
@@ -67,9 +61,6 @@ public class CommonProperties
public static final String QPID_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST = "qpid.security.tls.cipherSuiteBlackList";
public static final String QPID_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST_DEFAULT = "";
- public static final String QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST = "qpid.security.objectMessage.classHierarchyWhiteList";
- public static final String QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST = "qpid.security.objectMessage.classHierarchyBlackList";
-
/** The name of the version properties file to load from the class path. */
public static final String VERSION_RESOURCE = "qpidbrokerversion.properties";
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java Mon Feb 13 23:15:52 2017
@@ -23,8 +23,6 @@ package org.apache.qpid.server.propertie
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.util.SystemUtils;
-
/**
* Constants for the various properties clients can
* set values for during the ConnectionStartOk reply.
@@ -50,58 +48,11 @@ public class ConnectionStartProperties
public static final String VERSION_0_8 = "version";
public static final String VERSION_0_10 = "qpid.client_version";
- public static final String PROCESS = "qpid.client_process";
-
public static final String PID = "qpid.client_pid";
- public static final String PLATFORM = "platform";
-
public static final String PRODUCT ="product";
- public static final String SESSION_FLOW = "qpid.session_flow";
-
public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
public static final String QPID_QUEUE_LIFETIME_SUPPORTED = "qpid.queue_lifetime_supported";
-
- public static final int _pid;
-
- public static final String _platformInfo;
-
- static
- {
-
- _pid = SystemUtils.getProcessPidAsInt();
-
- if (_pid == -1)
- {
- LOGGER.warn("Unable to get the process's PID");
- }
-
- StringBuilder fullSystemInfo = new StringBuilder(System.getProperty("java.runtime.name"));
- fullSystemInfo.append(", ");
- fullSystemInfo.append(System.getProperty("java.runtime.version"));
- fullSystemInfo.append(", ");
- fullSystemInfo.append(System.getProperty("java.vendor"));
- fullSystemInfo.append(", ");
- fullSystemInfo.append(SystemUtils.getOSArch());
- fullSystemInfo.append(", ");
- fullSystemInfo.append(SystemUtils.getOSName());
- fullSystemInfo.append(", ");
- fullSystemInfo.append(SystemUtils.getOSVersion());
- fullSystemInfo.append(", ");
- fullSystemInfo.append(System.getProperty("sun.os.patch.level"));
-
- _platformInfo = fullSystemInfo.toString();
- }
-
- public static int getPID()
- {
- return _pid;
- }
-
- public static String getPlatformInfo()
- {
- return _platformInfo;
- }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java Mon Feb 13 23:15:52 2017
@@ -24,36 +24,21 @@ import static org.apache.qpid.server.tra
import static org.apache.qpid.server.transport.Connection.State.CLOSING;
import static org.apache.qpid.server.transport.Connection.State.NEW;
import static org.apache.qpid.server.transport.Connection.State.OPEN;
-import static org.apache.qpid.server.transport.Connection.State.OPENING;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.properties.ConnectionStartProperties;
-import org.apache.qpid.server.transport.network.Assembler;
-import org.apache.qpid.server.transport.network.Disassembler;
-import org.apache.qpid.server.transport.network.InputHandler;
import org.apache.qpid.server.transport.network.NetworkConnection;
-import org.apache.qpid.server.transport.network.TransportActivity;
-import org.apache.qpid.server.transport.network.io.IoNetworkTransport;
-import org.apache.qpid.server.transport.network.security.SecurityLayer;
-import org.apache.qpid.server.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.server.transport.util.Waiter;
-import org.apache.qpid.server.util.Strings;
/**
@@ -71,38 +56,13 @@ public class Connection extends Connecti
{
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
- //Usable channels are numbered 0 to <ChannelMax> - 1
- public static final int MAX_CHANNEL_MAX = 0xFFFF;
- public static final int MIN_USABLE_CHANNEL_NUM = 0;
private long _lastSendTime;
private long _lastReadTime;
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
- private boolean _messageCompressionSupported;
- private final AtomicBoolean _redirecting = new AtomicBoolean();
- private boolean _virtualHostPropertiesSupported;
- private boolean _queueLifetimePolicySupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
- public static interface SessionFactory
- {
- Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
- }
-
- private static final class DefaultSessionFactory implements SessionFactory
- {
-
- public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
- {
- return new Session(conn, name, expiry, isNoReplay);
- }
- }
-
- private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory();
-
- private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
-
private ConnectionDelegate delegate;
private ProtocolEventSender sender;
@@ -111,19 +71,11 @@ public class Connection extends Connecti
private State state = NEW;
final private Object lock = new Object();
- private long timeout = 60000;
- private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
+ private long timeout = 60000; // TODO server side close does not require this
private ConnectionException error = null;
private int channelMax = 1;
private String locale;
- private SaslServer saslServer;
- private SaslClient saslClient;
- private int idleTimeout = 0;
- private Map<String,Object> _serverProperties;
- private String userID;
- private ConnectionSettings conSettings;
- private SecurityLayer securityLayer;
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
@@ -137,16 +89,6 @@ public class Connection extends Connecti
this.delegate = delegate;
}
- public void addConnectionListener(ConnectionListener listener)
- {
- listeners.add(listener);
- }
-
- public List<ConnectionListener> getListeners()
- {
- return Collections.unmodifiableList(listeners);
- }
-
public ProtocolEventSender getSender()
{
return sender;
@@ -176,193 +118,6 @@ public class Connection extends Connecti
return locale;
}
- void setSaslServer(SaslServer saslServer)
- {
- this.saslServer = saslServer;
- }
-
- SaslServer getSaslServer()
- {
- return saslServer;
- }
-
- void setSaslClient(SaslClient saslClient)
- {
- this.saslClient = saslClient;
- }
-
- public SaslClient getSaslClient()
- {
- return saslClient;
- }
-
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs)
- {
- connect(host, port, vhost, username, password, ssl, saslMechs, null);
- }
-
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, Map<String,Object> clientProps)
- {
- ConnectionSettings settings = new ConnectionSettings();
- settings.setHost(host);
- settings.setPort(port);
- settings.setVhost(vhost);
- settings.setUsername(username);
- settings.setPassword(password);
- settings.setUseSSL(ssl);
- settings.setSaslMechs(saslMechs);
- settings.setClientProperties(clientProps);
- connect(settings);
- }
-
- public void connect(ConnectionSettings settings)
- {
-
- synchronized (lock)
- {
- conSettings = settings;
- _redirecting.set(false);
- state = OPENING;
- userID = settings.getUsername();
- connectionLost.set(false);
-
- securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
-
- IoNetworkTransport transport = new IoNetworkTransport();
- final InputHandler inputHandler = new InputHandler(new Assembler(this), false);
- addFrameSizeObserver(inputHandler);
- ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
- if(secureReceiver instanceof ConnectionListener)
- {
- addConnectionListener((ConnectionListener)secureReceiver);
- }
-
- _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
-
-
- setRemoteAddress(_networkConnection.getRemoteAddress());
- setLocalAddress(_networkConnection.getLocalAddress());
-
- final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
- if(secureSender instanceof ConnectionListener)
- {
- addConnectionListener((ConnectionListener)secureSender);
- }
- Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
- sender = disassembler;
- addFrameSizeObserver(disassembler);
-
- send(new ProtocolHeader(1, 0, 10));
-
- Waiter w = new Waiter(lock, timeout);
- while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting()))
- {
- w.await();
- }
-
- if (error != null)
- {
- ConnectionException t = error;
- error = null;
- try
- {
- close();
- }
- catch (ConnectionException ce)
- {
- if (!(t instanceof ProtocolVersionException))
- {
- throw ce;
- }
- }
- t.rethrow();
- }
-
- switch (state)
- {
- case OPENING:
- close();
- throw new ConnectionException("connect() timed out");
- case OPEN:
- case RESUMING:
- connectionLost.set(false);
- break;
- case CLOSED:
- throw new ConnectionException("connect() aborted");
- default:
- throw new IllegalStateException(String.valueOf(state));
- }
- }
-
- for (ConnectionListener listener: listeners)
- {
- listener.opened(this);
- }
- }
-
- public Session createSession()
- {
- return createSession(0);
- }
-
- public Session createSession(long expiry)
- {
- return createSession(expiry, false);
- }
-
- public Session createSession(long expiry, boolean isNoReplay)
- {
- return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
- }
-
- public Session createSession(String name)
- {
- return createSession(name, 0);
- }
-
- public Session createSession(String name, long expiry)
- {
- return createSession(Strings.toUTF8(name), expiry);
- }
-
- public Session createSession(String name, long expiry,boolean isNoReplay)
- {
- return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
- }
-
- public Session createSession(byte[] name, long expiry)
- {
- return createSession(new Binary(name), expiry);
- }
-
- public Session createSession(Binary name, long expiry)
- {
- return createSession(name, expiry, false);
- }
-
- public Session createSession(Binary name, long expiry, boolean isNoReplay)
- {
- synchronized (lock)
- {
- Waiter w = new Waiter(lock, timeout);
- while (w.hasTime() && state != OPEN && error == null)
- {
- w.await();
- }
-
- if (state != OPEN)
- {
- throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
- }
-
- Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
- registerSession(ssn);
- map(ssn);
- ssn.attach();
- return ssn;
- }
- }
-
public void registerSession(Session ssn)
{
synchronized (lock)
@@ -379,13 +134,6 @@ public class Connection extends Connecti
}
}
- public void setSessionFactory(SessionFactory sessionFactory)
- {
- assert sessionFactory != null;
-
- _sessionFactory = sessionFactory;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
@@ -542,12 +290,6 @@ public class Connection extends Connecti
return;
}
}
-
- for (ConnectionListener listener: listeners)
- {
- listener.exception(this, e);
- }
-
}
public void exception(Throwable t)
@@ -595,11 +337,6 @@ public class Connection extends Connecti
sender = null;
setState(CLOSED);
}
-
- for (ConnectionListener listener: listeners)
- {
- listener.closed(this);
- }
}
public void close()
@@ -672,50 +409,12 @@ public class Connection extends Connecti
}
}
- public String getUserID()
- {
- return userID;
- }
-
- public void setUserID(String id)
- {
- userID = id;
- }
-
- public void setServerProperties(final Map<String, Object> serverProperties)
- {
- _serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
- _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
- _virtualHostPropertiesSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
- _queueLifetimePolicySupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED)));
-
- }
-
- public Map<String, Object> getServerProperties()
- {
- return _serverProperties;
- }
-
+ @Override
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
}
- public ConnectionSettings getConnectionSettings()
- {
- return conSettings;
- }
-
- public SecurityLayer getSecurityLayer()
- {
- return securityLayer;
- }
-
- public boolean isConnectionResuming()
- {
- return connectionLost.get();
- }
-
protected boolean isConnectionLost()
{
return connectionLost.get();
@@ -731,14 +430,6 @@ public class Connection extends Connecti
return sessions.containsKey(new Binary(name));
}
- public void notifyFailoverRequired()
- {
- for (Session ssn : getChannels())
- {
- ssn.notifyFailoverRequired();
- }
- }
-
public SocketAddress getRemoteSocketAddress()
{
return _remoteAddress;
@@ -774,35 +465,6 @@ public class Connection extends Connecti
connectionHeartbeat();
}
- private class ConnectionActivity implements TransportActivity
- {
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return _lastSendTime;
- }
-
- @Override
- public void writerIdle()
- {
- getConnectionDelegate().writerIdle(Connection.this);
- }
-
- @Override
- public void readerIdle()
- {
- LOGGER.error("Closing connection as no heartbeat or other activity detected within specified interval");
- _networkConnection.close();
- }
- }
-
-
public void setNetworkConnection(NetworkConnection network)
{
_networkConnection = network;
@@ -842,31 +504,6 @@ public class Connection extends Connecti
}
}
- public boolean isMessageCompressionSupported()
- {
- return _messageCompressionSupported;
- }
-
- public boolean isVirtualHostPropertiesSupported()
- {
- return _virtualHostPropertiesSupported;
- }
-
- public boolean isQueueLifetimePolicySupported()
- {
- return _queueLifetimePolicySupported;
- }
-
- public boolean isRedirecting()
- {
- return _redirecting.get();
- }
-
- public void setRedirecting(final boolean redirecting)
- {
- _redirecting.set(redirecting);
- }
-
public boolean isClosing()
{
synchronized (lock)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Feb 13 23:15:52 2017
@@ -30,17 +30,15 @@ import java.nio.channels.SocketChannel;
import java.util.EnumSet;
import java.util.Set;
-import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.transport.TransportException;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.transport.network.TransportEncryption;
-import static org.apache.qpid.server.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
public class NonBlockingNetworkTransport
{
+ public static final String WILDCARD_ADDRESS = "*";
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java Mon Feb 13 23:15:52 2017
@@ -21,9 +21,6 @@
package org.apache.qpid.server.transport;
-import org.apache.qpid.server.configuration.ClientProperties;
-import org.apache.qpid.server.transport.network.Frame;
-import org.apache.qpid.server.transport.util.Waiter;
import static org.apache.qpid.server.transport.Option.COMPLETED;
import static org.apache.qpid.server.transport.Option.SYNC;
import static org.apache.qpid.server.transport.Option.TIMELY_REPLY;
@@ -44,12 +41,14 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.transport.network.Frame;
+import org.apache.qpid.server.transport.util.Waiter;
+
/**
* Session
*
@@ -86,18 +85,11 @@ public class Session extends SessionInvo
private Connection connection;
private Binary name;
- private long expiry;
private boolean closing;
private int channel;
private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
- private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
- Long.getLong(LegacyClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
- ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
- private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE,
- ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
- private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
- ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+ private final long timeout = 60000; // TODO server side close does not require this
private boolean autoSync = false;
@@ -122,8 +114,6 @@ public class Session extends SessionInvo
private State state = NEW;
- // transfer flow control
- private volatile boolean flowControl = false;
private Semaphore credit = new Semaphore(0);
private Thread resumer = null;
@@ -134,16 +124,6 @@ public class Session extends SessionInvo
private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
private boolean _isNoReplay = false;
- protected Session(Connection connection, Binary name, long expiry)
- {
- this(connection, new SessionDelegate(), name, expiry);
- }
-
- protected Session(Connection connection, Binary name, long expiry, boolean noReplay)
- {
- this(connection, new SessionDelegate(), name, expiry, noReplay);
- }
-
protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry,false);
@@ -154,7 +134,6 @@ public class Session extends SessionInvo
this.connection = connection;
this.delegate = delegate;
this.name = name;
- this.expiry = expiry;
this.closing = false;
this._isNoReplay = noReplay;
initReceiver();
@@ -170,11 +149,6 @@ public class Session extends SessionInvo
return name;
}
- void setExpiry(long expiry)
- {
- this.expiry = expiry;
- }
-
protected void setClose(boolean close)
{
this.closing = close;
@@ -190,31 +164,11 @@ public class Session extends SessionInvo
this.channel = channel;
}
- public void setSessionListener(SessionListener listener)
- {
- if (listener == null)
- {
- this.listener = new DefaultSessionListener();
- }
- else
- {
- this.listener = listener;
- }
- }
-
public SessionListener getSessionListener()
{
return listener;
}
- public void setAutoSync(boolean value)
- {
- synchronized (commandsLock)
- {
- this.autoSync = value;
- }
- }
-
protected void setState(State state)
{
synchronized (commandsLock)
@@ -229,11 +183,6 @@ public class Session extends SessionInvo
return this.state;
}
- void setFlowControl(boolean value)
- {
- flowControl = value;
- }
-
void addCredit(int value)
{
credit.release(value);
@@ -244,37 +193,6 @@ public class Session extends SessionInvo
credit.drainPermits();
}
- void acquireCredit()
- {
- if (flowControl)
- {
- try
- {
- long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod :
- blockedSendTimeout;
- long totalWait = 1L;
- while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait, TimeUnit.MILLISECONDS))
- {
- totalWait+=wait;
- LOGGER.warn("Message send delayed by {}s due to broker enforced flow control", (totalWait) / 1000);
-
-
- }
- if(totalWait > blockedSendTimeout)
- {
- LOGGER.error("Message send failed due to timeout waiting on broker enforced flow control");
- throw new SessionException
- ("timed out waiting for message credit");
- }
- }
- catch (InterruptedException e)
- {
- throw new SessionException
- ("interrupted while waiting for credit", null, e);
- }
- }
- }
-
private void initReceiver()
{
synchronized (processedLock)
@@ -617,11 +535,6 @@ public class Session extends SessionInvo
{
if (m.getEncodedTrack() == Frame.L4)
{
- if (m.hasPayload())
- {
- acquireCredit();
- }
-
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
@@ -1134,71 +1047,11 @@ public class Session extends SessionInvo
return this.detachCode;
}
- public void awaitOpen()
- {
- switch (state)
- {
- case NEW:
- synchronized(stateLock)
- {
- Waiter w = new Waiter(stateLock, timeout);
- while (w.hasTime() && state == NEW)
- {
- checkFailoverRequired("Session opening was interrupted by failover.");
- w.await();
- }
- }
-
- if (state != OPEN)
- {
- throw new SessionException("Timed out waiting for Session to open");
- }
- break;
- case DETACHED:
- case CLOSING:
- case CLOSED:
- throw new SessionException("Session closed");
- default :
- break;
- }
- }
-
public Object getStateLock()
{
return stateLock;
}
- protected void notifyFailoverRequired()
- {
- //ensure any operations waiting are aborted to
- //prevent them waiting for timeout for 60 seconds
- //and possibly preventing failover proceeding
- _failoverRequired.set(true);
- synchronized (commandsLock)
- {
- commandsLock.notifyAll();
- }
- synchronized (results)
- {
- for (ResultFuture<?> result : results.values())
- {
- synchronized(result)
- {
- result.notifyAll();
- }
- }
- }
- }
-
- /**
- * An auxiliary method for test purposes only
- * @return true if flow is blocked
- */
- public boolean isFlowBlocked()
- {
- return flowControl && credit.availablePermits() == 0;
- }
-
protected void sendSessionAttached(final byte[] name, final Option... options)
{
super.sessionAttached(name, options);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java Mon Feb 13 23:15:52 2017
@@ -192,15 +192,8 @@ public class SessionDelegate
@Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
{
- if ("".equals(sfm.getDestination()) &&
- MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
- {
- ssn.setFlowControl(true);
- }
- else
- {
- super.messageSetFlowMode(ssn, sfm);
- }
+ // Method overridden in ServerSessionDelegate
+ throw new UnsupportedOperationException();
}
@Override public void messageFlow(Session ssn, MessageFlow flow)
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java Mon Feb 13 23:15:52 2017
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.server.configuration.ClientProperties;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.server.framing.HeartbeatBody;
import org.apache.qpid.server.framing.ProtocolInitiation;
import org.apache.qpid.server.framing.ProtocolVersion;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org