You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/13 23:15:53 UTC

svn commit: r1782895 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/configuration/ broker-core/src/main/java/org/apache/qpid/server/properties/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/jav...

Author: kwall
Date: Mon Feb 13 23:15:52 2017
New Revision: 1782895

URL: http://svn.apache.org/viewvc?rev=1782895&view=rev
Log:
QPID-7603: [Java Broker] Remove client transport, client threading, and configuration/properties

Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Accessor.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/ClientProperties.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Configured.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/QpidProperty.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/Validator.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/DefaultThreadFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/LoggingUncaughtExceptionHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/QpidThreadExecutor.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/RealtimeThreadFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/ThreadFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/thread/Threading.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ClientDelegate.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ConnectionSettings.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerDelegate.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/StreamReturnCode.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/Assembler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/Disassembler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/InputHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IdleTimeoutTicker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoNetworkConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoReceiver.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/io/IoSender.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SSLStatus.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SecurityLayer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/SecurityLayerFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLEncryptor.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLReceiver.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/sasl/SASLSender.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/ssl/SSLReceiver.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/network/security/ssl/SSLSender.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/configuration/QpidPropertyTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/thread/ThreadFactoryTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/ConnectionSettingsTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/ConnectionTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/SessionTimeoutTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/ConnectionBinding.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IdleTimeoutTickerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IoAcceptor.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/network/io/IoTransport.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/CommonProperties.java Mon Feb 13 23:15:52 2017
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
  * Qpid build specific information like  project name, version number, and source code repository revision number
  * are captured by this class and exposed via public static methods.
  *
- * @see ClientProperties
  */
 public class CommonProperties
 {
@@ -52,11 +51,6 @@ public class CommonProperties
     public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = "qpid.io_network_transport_timeout";
     public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
 
-    public static final String QPID_CLIENT_SECURITY_TLS_PROTOCOL_WHITE_LIST = "qpid.client.security.tls.protocolWhiteList";
-    public static final String QPID_CLIENT_SECURITY_TLS_PROTOCOL_BLACK_LIST = "qpid.client.security.tls.protocolBlackList";
-    public static final String QPID_CLIENT_SECURITY_TLS_CIPHER_SUITE_WHITE_LIST = "qpid.client.security.tls.cipherSuiteWhiteList";
-    public static final String QPID_CLIENT_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST = "qpid.client.security.tls.cipherSuiteBlackList";
-
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST = "qpid.security.tls.protocolWhiteList";
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT = "TLSv1\\.[0-9]+";
     public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST = "qpid.security.tls.protocolBlackList";
@@ -67,9 +61,6 @@ public class CommonProperties
     public static final String QPID_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST = "qpid.security.tls.cipherSuiteBlackList";
     public static final String QPID_SECURITY_TLS_CIPHER_SUITE_BLACK_LIST_DEFAULT = "";
 
-    public static final String QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST = "qpid.security.objectMessage.classHierarchyWhiteList";
-    public static final String QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST = "qpid.security.objectMessage.classHierarchyBlackList";
-
     /** The name of the version properties file to load from the class path. */
     public static final String VERSION_RESOURCE = "qpidbrokerversion.properties";
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/properties/ConnectionStartProperties.java Mon Feb 13 23:15:52 2017
@@ -23,8 +23,6 @@ package org.apache.qpid.server.propertie
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.util.SystemUtils;
-
 /**
  * Constants for the various properties clients can
  * set values for during the ConnectionStartOk reply.
@@ -50,58 +48,11 @@ public class ConnectionStartProperties
     public static final String VERSION_0_8 = "version";
     public static final String VERSION_0_10 = "qpid.client_version";
 
-    public static final String PROCESS = "qpid.client_process";
-
     public static final String PID = "qpid.client_pid";
 
-    public static final String PLATFORM = "platform";
-
     public static final String PRODUCT ="product";
 
-    public static final String SESSION_FLOW = "qpid.session_flow";
-
     public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
 
     public static final String QPID_QUEUE_LIFETIME_SUPPORTED = "qpid.queue_lifetime_supported";
-
-    public static final int _pid;
-
-    public static final String _platformInfo;
-
-    static
-    {
-
-        _pid = SystemUtils.getProcessPidAsInt();
-
-        if (_pid == -1)
-        {
-            LOGGER.warn("Unable to get the process's PID");
-        }
-
-        StringBuilder fullSystemInfo = new StringBuilder(System.getProperty("java.runtime.name"));
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(System.getProperty("java.runtime.version"));
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(System.getProperty("java.vendor"));
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(SystemUtils.getOSArch());
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(SystemUtils.getOSName());
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(SystemUtils.getOSVersion());
-        fullSystemInfo.append(", ");
-        fullSystemInfo.append(System.getProperty("sun.os.patch.level"));
-
-        _platformInfo = fullSystemInfo.toString();
-    }
-
-    public static int getPID()
-    {
-        return _pid;
-    }
-
-    public static String getPlatformInfo()
-    {
-        return _platformInfo;
-    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Connection.java Mon Feb 13 23:15:52 2017
@@ -24,36 +24,21 @@ import static org.apache.qpid.server.tra
 import static org.apache.qpid.server.transport.Connection.State.CLOSING;
 import static org.apache.qpid.server.transport.Connection.State.NEW;
 import static org.apache.qpid.server.transport.Connection.State.OPEN;
-import static org.apache.qpid.server.transport.Connection.State.OPENING;
 
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.properties.ConnectionStartProperties;
-import org.apache.qpid.server.transport.network.Assembler;
-import org.apache.qpid.server.transport.network.Disassembler;
-import org.apache.qpid.server.transport.network.InputHandler;
 import org.apache.qpid.server.transport.network.NetworkConnection;
-import org.apache.qpid.server.transport.network.TransportActivity;
-import org.apache.qpid.server.transport.network.io.IoNetworkTransport;
-import org.apache.qpid.server.transport.network.security.SecurityLayer;
-import org.apache.qpid.server.transport.network.security.SecurityLayerFactory;
 import org.apache.qpid.server.transport.util.Waiter;
-import org.apache.qpid.server.util.Strings;
 
 
 /**
@@ -71,38 +56,13 @@ public class Connection extends Connecti
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
 
-    //Usable channels are numbered 0 to <ChannelMax> - 1
-    public static final int MAX_CHANNEL_MAX = 0xFFFF;
-    public static final int MIN_USABLE_CHANNEL_NUM = 0;
     private long _lastSendTime;
     private long _lastReadTime;
     private NetworkConnection _networkConnection;
     private FrameSizeObserver _frameSizeObserver;
-    private boolean _messageCompressionSupported;
-    private final AtomicBoolean _redirecting = new AtomicBoolean();
-    private boolean _virtualHostPropertiesSupported;
-    private boolean _queueLifetimePolicySupported;
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
-    public static interface SessionFactory
-    {
-        Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
-    }
-
-    private static final class DefaultSessionFactory implements SessionFactory
-    {
-
-        public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
-        {
-            return new Session(conn, name, expiry, isNoReplay);
-        }
-    }
-
-    private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory();
-
-    private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
-
     private ConnectionDelegate delegate;
     private ProtocolEventSender sender;
 
@@ -111,19 +71,11 @@ public class Connection extends Connecti
 
     private State state = NEW;
     final private Object lock = new Object();
-    private long timeout = 60000;
-    private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
+    private long timeout = 60000;  // TODO server side close does not require this
     private ConnectionException error = null;
 
     private int channelMax = 1;
     private String locale;
-    private SaslServer saslServer;
-    private SaslClient saslClient;
-    private int idleTimeout = 0;
-    private Map<String,Object> _serverProperties;
-    private String userID;
-    private ConnectionSettings conSettings;
-    private SecurityLayer securityLayer;
 
     private final AtomicBoolean connectionLost = new AtomicBoolean(false);
 
@@ -137,16 +89,6 @@ public class Connection extends Connecti
         this.delegate = delegate;
     }
 
-    public void addConnectionListener(ConnectionListener listener)
-    {
-        listeners.add(listener);
-    }
-
-    public List<ConnectionListener> getListeners()
-    {
-        return Collections.unmodifiableList(listeners);
-    }
-
     public ProtocolEventSender getSender()
     {
         return sender;
@@ -176,193 +118,6 @@ public class Connection extends Connecti
         return locale;
     }
 
-    void setSaslServer(SaslServer saslServer)
-    {
-        this.saslServer = saslServer;
-    }
-
-    SaslServer getSaslServer()
-    {
-        return saslServer;
-    }
-
-    void setSaslClient(SaslClient saslClient)
-    {
-        this.saslClient = saslClient;
-    }
-
-    public SaslClient getSaslClient()
-    {
-        return saslClient;
-    }
-
-    public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs)
-    {
-        connect(host, port, vhost, username, password, ssl, saslMechs, null);
-    }
-
-    public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, Map<String,Object> clientProps)
-    {
-        ConnectionSettings settings = new ConnectionSettings();
-        settings.setHost(host);
-        settings.setPort(port);
-        settings.setVhost(vhost);
-        settings.setUsername(username);
-        settings.setPassword(password);
-        settings.setUseSSL(ssl);
-        settings.setSaslMechs(saslMechs);
-        settings.setClientProperties(clientProps);
-        connect(settings);
-    }
-
-    public void connect(ConnectionSettings settings)
-    {
-
-        synchronized (lock)
-        {
-            conSettings = settings;
-            _redirecting.set(false);
-            state = OPENING;
-            userID = settings.getUsername();
-            connectionLost.set(false);
-
-            securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
-
-            IoNetworkTransport transport = new IoNetworkTransport();
-            final InputHandler inputHandler = new InputHandler(new Assembler(this), false);
-            addFrameSizeObserver(inputHandler);
-            ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
-            if(secureReceiver instanceof ConnectionListener)
-            {
-                addConnectionListener((ConnectionListener)secureReceiver);
-            }
-
-            _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
-
-
-            setRemoteAddress(_networkConnection.getRemoteAddress());
-            setLocalAddress(_networkConnection.getLocalAddress());
-
-            final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
-            if(secureSender instanceof ConnectionListener)
-            {
-                addConnectionListener((ConnectionListener)secureSender);
-            }
-            Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
-            sender = disassembler;
-            addFrameSizeObserver(disassembler);
-
-            send(new ProtocolHeader(1, 0, 10));
-
-            Waiter w = new Waiter(lock, timeout);
-            while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting()))
-            {
-                w.await();
-            }
-
-            if (error != null)
-            {
-                ConnectionException t = error;
-                error = null;
-                try
-                {
-                    close();
-                }
-                catch (ConnectionException ce)
-                {
-                    if (!(t instanceof ProtocolVersionException))
-                    {
-                        throw ce;
-                    }
-                }
-                t.rethrow();
-            }
-
-            switch (state)
-            {
-            case OPENING:
-                close();
-                throw new ConnectionException("connect() timed out");
-            case OPEN:
-            case RESUMING:
-                connectionLost.set(false);
-                break;
-            case CLOSED:
-                throw new ConnectionException("connect() aborted");
-            default:
-                throw new IllegalStateException(String.valueOf(state));
-            }
-        }
-
-        for (ConnectionListener listener: listeners)
-        {
-            listener.opened(this);
-        }
-    }
-
-    public Session createSession()
-    {
-        return createSession(0);
-    }
-
-    public Session createSession(long expiry)
-    {
-        return createSession(expiry, false);
-    }
-
-    public Session createSession(long expiry, boolean isNoReplay)
-    {
-        return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
-    }
-
-    public Session createSession(String name)
-    {
-        return createSession(name, 0);
-    }
-
-    public Session createSession(String name, long expiry)
-    {
-        return createSession(Strings.toUTF8(name), expiry);
-    }
-
-    public Session createSession(String name, long expiry,boolean isNoReplay)
-    {
-        return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
-    }
-
-    public Session createSession(byte[] name, long expiry)
-    {
-        return createSession(new Binary(name), expiry);
-    }
-
-    public Session createSession(Binary name, long expiry)
-    {
-        return createSession(name, expiry, false);
-    }
-
-    public Session createSession(Binary name, long expiry, boolean isNoReplay)
-    {
-        synchronized (lock)
-        {
-            Waiter w = new Waiter(lock, timeout);
-            while (w.hasTime() && state != OPEN && error == null)
-            {
-                w.await();
-            }
-
-            if (state != OPEN)
-            {
-                throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
-            }
-
-            Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
-            registerSession(ssn);
-            map(ssn);
-            ssn.attach();
-            return ssn;
-        }
-    }
-
     public void registerSession(Session ssn)
     {
         synchronized (lock)
@@ -379,13 +134,6 @@ public class Connection extends Connecti
         }
     }
 
-    public void setSessionFactory(SessionFactory sessionFactory)
-    {
-        assert sessionFactory != null;
-
-        _sessionFactory = sessionFactory;
-    }
-
     public ConnectionDelegate getConnectionDelegate()
     {
         return delegate;
@@ -542,12 +290,6 @@ public class Connection extends Connecti
                 return;
             }
         }
-
-        for (ConnectionListener listener: listeners)
-        {
-            listener.exception(this, e);
-        }
-
     }
 
     public void exception(Throwable t)
@@ -595,11 +337,6 @@ public class Connection extends Connecti
             sender = null;
             setState(CLOSED);
         }
-
-        for (ConnectionListener listener: listeners)
-        {
-            listener.closed(this);
-        }
     }
 
     public void close()
@@ -672,50 +409,12 @@ public class Connection extends Connecti
         }
     }
 
-    public String getUserID()
-    {
-        return userID;
-    }
-
-    public void setUserID(String id)
-    {
-        userID = id;
-    }
-
-    public void setServerProperties(final Map<String, Object> serverProperties)
-    {
-        _serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
-        _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
-        _virtualHostPropertiesSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
-        _queueLifetimePolicySupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED)));
-
-    }
-
-    public Map<String, Object> getServerProperties()
-    {
-        return _serverProperties;
-    }
-
+    @Override
     public String toString()
     {
         return String.format("conn:%x", System.identityHashCode(this));
     }
 
-    public ConnectionSettings getConnectionSettings()
-    {
-        return conSettings;
-    }
-
-    public SecurityLayer getSecurityLayer()
-    {
-        return securityLayer;
-    }
-
-    public boolean isConnectionResuming()
-    {
-        return connectionLost.get();
-    }
-
     protected boolean isConnectionLost()
     {
         return connectionLost.get();
@@ -731,14 +430,6 @@ public class Connection extends Connecti
         return sessions.containsKey(new Binary(name));
     }
 
-    public void notifyFailoverRequired()
-    {
-        for (Session ssn : getChannels())
-        {
-            ssn.notifyFailoverRequired();
-        }
-    }
-
     public SocketAddress getRemoteSocketAddress()
     {
         return _remoteAddress;
@@ -774,35 +465,6 @@ public class Connection extends Connecti
         connectionHeartbeat();
     }
 
-    private class ConnectionActivity implements TransportActivity
-    {
-        @Override
-        public long getLastReadTime()
-        {
-            return _lastReadTime;
-        }
-
-        @Override
-        public long getLastWriteTime()
-        {
-            return _lastSendTime;
-        }
-
-        @Override
-        public void writerIdle()
-        {
-            getConnectionDelegate().writerIdle(Connection.this);
-        }
-
-        @Override
-        public void readerIdle()
-        {
-            LOGGER.error("Closing connection as no heartbeat or other activity detected within specified interval");
-            _networkConnection.close();
-        }
-    }
-
-
     public void setNetworkConnection(NetworkConnection network)
     {
         _networkConnection = network;
@@ -842,31 +504,6 @@ public class Connection extends Connecti
         }
     }
 
-    public boolean isMessageCompressionSupported()
-    {
-        return _messageCompressionSupported;
-    }
-
-    public boolean isVirtualHostPropertiesSupported()
-    {
-        return _virtualHostPropertiesSupported;
-    }
-
-    public boolean isQueueLifetimePolicySupported()
-    {
-        return _queueLifetimePolicySupported;
-    }
-
-    public boolean isRedirecting()
-    {
-        return _redirecting.get();
-    }
-
-    public void setRedirecting(final boolean redirecting)
-    {
-        _redirecting.set(redirecting);
-    }
-
     public boolean isClosing()
     {
         synchronized (lock)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Feb 13 23:15:52 2017
@@ -30,17 +30,15 @@ import java.nio.channels.SocketChannel;
 import java.util.EnumSet;
 import java.util.Set;
 
-import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.transport.TransportException;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.transport.network.TransportEncryption;
 
-import static org.apache.qpid.server.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
 public class NonBlockingNetworkTransport
 {
+    public static final String WILDCARD_ADDRESS = "*";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/Session.java Mon Feb 13 23:15:52 2017
@@ -21,9 +21,6 @@
 package org.apache.qpid.server.transport;
 
 
-import org.apache.qpid.server.configuration.ClientProperties;
-import org.apache.qpid.server.transport.network.Frame;
-import org.apache.qpid.server.transport.util.Waiter;
 import static org.apache.qpid.server.transport.Option.COMPLETED;
 import static org.apache.qpid.server.transport.Option.SYNC;
 import static org.apache.qpid.server.transport.Option.TIMELY_REPLY;
@@ -44,12 +41,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.transport.network.Frame;
+import org.apache.qpid.server.transport.util.Waiter;
+
 /**
  * Session
  *
@@ -86,18 +85,11 @@ public class Session extends SessionInvo
 
     private Connection connection;
     private Binary name;
-    private long expiry;
     private boolean closing;
     private int channel;
     private SessionDelegate delegate;
     private SessionListener listener = new DefaultSessionListener();
-    private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
-                                        Long.getLong(LegacyClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
-                                                     ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
-    private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE,
-                                                         ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-    private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
-                                                           ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+    private final long timeout = 60000;  // TODO server side close does not require this
 
     private boolean autoSync = false;
 
@@ -122,8 +114,6 @@ public class Session extends SessionInvo
 
     private State state = NEW;
 
-    // transfer flow control
-    private volatile boolean flowControl = false;
     private Semaphore credit = new Semaphore(0);
 
     private Thread resumer = null;
@@ -134,16 +124,6 @@ public class Session extends SessionInvo
     private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
     private boolean _isNoReplay = false;
 
-    protected Session(Connection connection, Binary name, long expiry)
-    {
-        this(connection, new SessionDelegate(), name, expiry);
-    }
-
-    protected Session(Connection connection, Binary name, long expiry, boolean noReplay)
-    {
-        this(connection, new SessionDelegate(), name, expiry, noReplay);
-    }
-
     protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
         this(connection, delegate, name, expiry,false);
@@ -154,7 +134,6 @@ public class Session extends SessionInvo
         this.connection = connection;
         this.delegate = delegate;
         this.name = name;
-        this.expiry = expiry;
         this.closing = false;
         this._isNoReplay = noReplay;
         initReceiver();
@@ -170,11 +149,6 @@ public class Session extends SessionInvo
         return name;
     }
 
-    void setExpiry(long expiry)
-    {
-        this.expiry = expiry;
-    }
-
     protected void setClose(boolean close)
     {
         this.closing = close;
@@ -190,31 +164,11 @@ public class Session extends SessionInvo
         this.channel = channel;
     }
 
-    public void setSessionListener(SessionListener listener)
-    {
-        if (listener == null)
-        {
-            this.listener = new DefaultSessionListener();
-        }
-        else
-        {
-            this.listener = listener;
-        }
-    }
-
     public SessionListener getSessionListener()
     {
         return listener;
     }
 
-    public void setAutoSync(boolean value)
-    {
-        synchronized (commandsLock)
-        {
-            this.autoSync = value;
-        }
-    }
-
     protected void setState(State state)
     {
         synchronized (commandsLock)
@@ -229,11 +183,6 @@ public class Session extends SessionInvo
         return this.state;
     }
 
-    void setFlowControl(boolean value)
-    {
-        flowControl = value;
-    }
-
     void addCredit(int value)
     {
         credit.release(value);
@@ -244,37 +193,6 @@ public class Session extends SessionInvo
         credit.drainPermits();
     }
 
-    void acquireCredit()
-    {
-        if (flowControl)
-        {
-            try
-            {
-                long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod :
-                           blockedSendTimeout;
-                long totalWait = 1L;
-                while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait, TimeUnit.MILLISECONDS))
-                {
-                    totalWait+=wait;
-                    LOGGER.warn("Message send delayed by {}s due to broker enforced flow control", (totalWait) / 1000);
-
-
-                }
-                if(totalWait > blockedSendTimeout)
-                {
-                    LOGGER.error("Message send failed due to timeout waiting on broker enforced flow control");
-                    throw new SessionException
-                            ("timed out waiting for message credit");
-                }
-            }
-            catch (InterruptedException e)
-            {
-                throw new SessionException
-                    ("interrupted while waiting for credit", null, e);
-            }
-        }
-    }
-
     private void initReceiver()
     {
         synchronized (processedLock)
@@ -617,11 +535,6 @@ public class Session extends SessionInvo
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
-            if (m.hasPayload())
-            {
-                acquireCredit();
-            }
-
             synchronized (commandsLock)
             {
                 if (state == DETACHED && m.isUnreliable())
@@ -1134,71 +1047,11 @@ public class Session extends SessionInvo
         return this.detachCode;
     }
 
-    public void awaitOpen()
-    {
-        switch (state)
-        {
-        case NEW:
-            synchronized(stateLock)
-            {
-                Waiter w = new Waiter(stateLock, timeout);
-                while (w.hasTime() && state == NEW)
-                {
-                    checkFailoverRequired("Session opening was interrupted by failover.");
-                    w.await();
-                }
-            }
-
-            if (state != OPEN)
-            {
-                throw new SessionException("Timed out waiting for Session to open");
-            }
-            break;
-        case DETACHED:
-        case CLOSING:
-        case CLOSED:
-            throw new SessionException("Session closed");
-        default :
-            break;
-        }
-    }
-
     public Object getStateLock()
     {
         return stateLock;
     }
 
-    protected void notifyFailoverRequired()
-    {
-        //ensure any operations waiting are aborted to
-        //prevent them waiting for timeout for 60 seconds
-        //and possibly preventing failover proceeding
-        _failoverRequired.set(true);
-        synchronized (commandsLock)
-        {
-            commandsLock.notifyAll();
-        }
-        synchronized (results)
-        {
-            for (ResultFuture<?> result : results.values())
-            {
-                synchronized(result)
-                {
-                    result.notifyAll();
-                }
-            }
-        }
-    }
-
-    /**
-     * An auxiliary method for test purposes only
-     * @return true if flow is blocked
-     */
-    public boolean isFlowBlocked()
-    {
-        return flowControl && credit.availablePermits() == 0;
-    }
-
     protected void sendSessionAttached(final byte[] name, final Option... options)
     {
         super.sessionAttached(name, options);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SessionDelegate.java Mon Feb 13 23:15:52 2017
@@ -192,15 +192,8 @@ public class SessionDelegate
 
     @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
     {
-        if ("".equals(sfm.getDestination()) &&
-            MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
-        {
-            ssn.setFlowControl(true);
-        }
-        else
-        {
-            super.messageSetFlowMode(ssn, sfm);
-        }
+        // Method overridden in ServerSessionDelegate
+        throw new UnsupportedOperationException();
     }
 
     @Override public void messageFlow(Session ssn, MessageFlow flow)

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1782895&r1=1782894&r2=1782895&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java Mon Feb 13 23:15:52 2017
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.server.configuration.ClientProperties;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.server.framing.HeartbeatBody;
 import org.apache.qpid.server.framing.ProtocolInitiation;
 import org.apache.qpid.server.framing.ProtocolVersion;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org