You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/03/27 11:32:38 UTC

svn commit: r1788900 - /qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/

Author: orudyy
Date: Mon Mar 27 11:32:37 2017
New Revision: 1788900

URL: http://svn.apache.org/viewvc?rev=1788900&view=rev
Log:
QPID-7622: Tidy up 0-10 after separation

Removed:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Mon Mar 27 11:32:37 2017
@@ -89,7 +89,6 @@ public class AMQPConnection_0_10Impl ext
 
         _connection.setConnectionDelegate(connDelegate);
         _connection.setRemoteAddress(network.getRemoteAddress());
-        _connection.setLocalAddress(network.getLocalAddress());
 
         _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
         _connection.addFrameSizeObserver(_inputHandler);
@@ -327,7 +326,7 @@ public class AMQPConnection_0_10Impl ext
 
     public String getRemoteContainerName()
     {
-        return _connection.getRemoteContainerName();
+        return getClientId();
     }
 
     public Collection<? extends Session_0_10> getSessionModels()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 27 11:32:37 2017
@@ -102,7 +102,6 @@ public class ServerConnection extends Co
     private int channelMax = 1;
     private String locale;
     private SocketAddress _remoteAddress;
-    private SocketAddress _localAddress;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -409,9 +408,8 @@ public class ServerConnection extends Co
     {
         for (ServerSession ssn :  getChannels())
         {
-            final ServerSession session = (ServerSession) ssn;
-            ((ServerSession) ssn).setClose(true);
-            session.closed();
+            ssn.setClose(true);
+            ssn.closed();
         }
     }
 
@@ -419,7 +417,7 @@ public class ServerConnection extends Co
     {
         for (ServerSession ssn : getChannels())
         {
-            ((ServerSession)ssn).receivedComplete();
+            ssn.receivedComplete();
         }
     }
 
@@ -439,12 +437,6 @@ public class ServerConnection extends Co
     }
 
 
-    public String getRemoteContainerName()
-    {
-        return getConnectionDelegate().getClientId();
-    }
-
-
     public long getSessionCountLimit()
     {
         return getChannelMax();
@@ -668,21 +660,11 @@ public class ServerConnection extends Co
         return _remoteAddress;
     }
 
-    public SocketAddress getLocalAddress()
-    {
-        return _localAddress;
-    }
-
     protected void setRemoteAddress(SocketAddress remoteAddress)
     {
         _remoteAddress = remoteAddress;
     }
 
-    protected void setLocalAddress(SocketAddress localAddress)
-    {
-        _localAddress = localAddress;
-    }
-
     private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
     {
         SessionDetached sessionDetached = new SessionDetached();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 27 11:32:37 2017
@@ -101,21 +101,25 @@ public class ServerConnectionDelegate ex
         _maximumFrameSize = Math.min(0xffff, broker.getNetworkBufferSize());
     }
 
+    @Override
     public void control(ServerConnection conn, Method method)
     {
         method.dispatch(conn, this);
     }
 
+    @Override
     public void command(ServerConnection conn, Method method)
     {
         method.dispatch(conn, this);
     }
 
+    @Override
     public void error(ServerConnection conn, ProtocolError error)
     {
         conn.exception(new ConnectionException(error.getMessage()));
     }
 
+    @Override
     public void handle(ServerConnection conn, Method method)
     {
         conn.dispatch(method);
@@ -148,13 +152,6 @@ public class ServerConnectionDelegate ex
         }
     }
 
-    public void writerIdle(final ServerConnection connection)
-    {
-        connection.doHeartBeat();
-    }
-
-
-
     public final ConnectionState getState()
     {
         return _state;
@@ -172,9 +169,8 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void init(final ServerConnection conn, final ProtocolHeader hdr)
+    public void init(final ServerConnection serverConnection, final ProtocolHeader hdr)
     {
-        ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.INIT);
         serverConnection.send(new ProtocolHeader(1, 0, 10));
         serverConnection.sendConnectionStart(_clientProperties, _mechanisms, _locales);
@@ -216,9 +212,8 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionSecureOk(final ServerConnection conn, final ConnectionSecureOk ok)
+    public void connectionSecureOk(final ServerConnection serverConnection, final ConnectionSecureOk ok)
     {
-        ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
         secure(serverConnection, ok.getResponse());
     }
@@ -261,18 +256,16 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionClose(ServerConnection conn, ConnectionClose close)
+    public void connectionClose(ServerConnection sconn, ConnectionClose close)
     {
-        final ServerConnection sconn = (ServerConnection) conn;
         sconn.closeCode(close);
         sconn.setState(CLOSE_RCVD);
-        sendConnectionCloseOkAndCloseSender(conn);
+        sendConnectionCloseOkAndCloseSender(sconn);
     }
 
-
-    public void connectionOpen(ServerConnection conn, ConnectionOpen open)
+    @Override
+    public void connectionOpen(ServerConnection sconn, ConnectionOpen open)
     {
-        final ServerConnection sconn = (ServerConnection) conn;
         assertState(sconn, ConnectionState.AWAIT_OPEN);
         NamedAddressSpace addressSpace;
         String vhostName;
@@ -285,11 +278,9 @@ public class ServerConnectionDelegate ex
             vhostName = "";
         }
 
-        AmqpPort port = (AmqpPort) sconn.getPort();
+        AmqpPort port = sconn.getPort();
         addressSpace = port.getAddressSpace(vhostName);
 
-
-
         if(addressSpace != null)
         {
             if (!addressSpace.isActive())
@@ -339,9 +330,8 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionTuneOk(final ServerConnection conn, final ConnectionTuneOk ok)
+    public void connectionTuneOk(final ServerConnection sconn, final ConnectionTuneOk ok)
     {
-        ServerConnection sconn = (ServerConnection) conn;
         assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
         int okChannelMax = ok.getChannelMax();
         int okMaxFrameSize = ok.getMaxFrameSize();
@@ -421,7 +411,7 @@ public class ServerConnectionDelegate ex
 
     private void stopAllSubscriptions(ServerConnection conn, SessionDetach dtc)
     {
-        final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
+        final ServerSession ssn = conn.getSession(dtc.getChannel());
         final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
         for (ConsumerTarget_0_10 subscription_0_10 : subs)
         {
@@ -431,21 +421,20 @@ public class ServerConnectionDelegate ex
 
 
     @Override
-    public void sessionAttach(final ServerConnection conn, final SessionAttach atc)
+    public void sessionAttach(final ServerConnection serverConnection, final SessionAttach atc)
     {
-        ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.OPEN);
 
         ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
         final ServerSession serverSession =
-                new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
-        final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), atc.getChannel(),
+                new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
+        final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
                                                       serverSession);
         session.create();
         serverSession.setModelObject(session);
 
-        if(isSessionNameUnique(atc.getName(), conn))
+        if(isSessionNameUnique(atc.getName(), serverConnection))
         {
             serverConnection.map(serverSession, atc.getChannel());
             serverConnection.registerSession(serverSession);
@@ -461,12 +450,11 @@ public class ServerConnectionDelegate ex
 
     private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
     {
-        final ServerConnection sconn = (ServerConnection) conn;
-        final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+        final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
         final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
 
         final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
-                        ((ServerConnection)conn).getAddressSpace().getConnections().iterator();
+                        conn.getAddressSpace().getConnections().iterator();
         while(connections.hasNext())
         {
             final AMQPConnection<?> amqConnectionModel = (AMQPConnection<?>) connections.next();
@@ -483,9 +471,8 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionStartOk(ServerConnection conn, ConnectionStartOk ok)
+    public void connectionStartOk(ServerConnection serverConnection, ConnectionStartOk ok)
     {
-        ServerConnection serverConnection = (ServerConnection)conn;
         assertState(serverConnection, ConnectionState.AWAIT_START_OK);
         _clientProperties = ok.getClientProperties();
         if(_clientProperties != null)
@@ -533,30 +520,6 @@ public class ServerConnectionDelegate ex
         return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
     }
 
-    public Map<String,Object> getClientProperties()
-    {
-        return _clientProperties;
-    }
-
-    public String getClientId()
-    {
-        return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10);
-    }
-
-    public String getClientVersion()
-    {
-        return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10);
-    }
-
-    public String getClientProduct()
-    {
-        return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
-    }
-
-    public String getRemoteProcessPid()
-    {
-        return (_clientProperties == null || _clientProperties.get(ConnectionStartProperties.PID) == null) ? null : String.valueOf(_clientProperties.get(ConnectionStartProperties.PID));
-    }
 
     protected int getHeartbeatMax()
     {
@@ -569,9 +532,8 @@ public class ServerConnectionDelegate ex
         return _compressionSupported && _broker.isMessageCompressionEnabled();
     }
 
-    private void connectionAuthFailed(final ServerConnection conn, Exception e)
+    private void connectionAuthFailed(final ServerConnection serverConnection, Exception e)
     {
-        ServerConnection serverConnection = (ServerConnection)conn;
         if (e != null)
         {
             serverConnection.exception(e);

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Mon Mar 27 11:32:37 2017
@@ -24,7 +24,6 @@ import static org.apache.qpid.server.tra
 import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.slf4j.Logger;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Mar 27 11:32:37 2017
@@ -112,7 +112,6 @@ public class ServerSession extends Sessi
     private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
     private final Object commandsLock = new Object();
     private final Object stateLock = new Object();
-    private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
     private Session_0_10 _modelObject;
     private long _blockTime;
     private long _blockingTimeout;
@@ -122,7 +121,6 @@ public class ServerSession extends Sessi
     private boolean closing;
     private int channel;
     private ServerSessionDelegate delegate;
-    private SessionListener listener = new DefaultSessionListener();
     private boolean incomingInit;
     // incoming command count
     private int commandsIn;
@@ -172,9 +170,7 @@ public class ServerSession extends Sessi
         initReceiver();
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
 
-        ServerConnection serverConnection = (ServerConnection) connection;
-
-        _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+        _blockingTimeout = connection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
     }
 
     public Binary getName()
@@ -197,11 +193,6 @@ public class ServerSession extends Sessi
         this.channel = channel;
     }
 
-    public SessionListener getSessionListener()
-    {
-        return listener;
-    }
-
     protected State getState()
     {
         return this.state;
@@ -235,8 +226,6 @@ public class ServerSession extends Sessi
 
     void resume()
     {
-        _failoverRequired.set(false);
-
         synchronized (commandsLock)
         {
             attach();
@@ -293,7 +282,6 @@ public class ServerSession extends Sessi
                 txSelect();
             }
 
-            listener.resumed(this);
             resumer = null;
         }
     }
@@ -701,14 +689,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    private void checkFailoverRequired(String message)
-    {
-        if (_failoverRequired.get())
-        {
-            throw new SessionException(message);
-        }
-    }
-
     protected boolean shouldIssueFlush(int next)
     {
         return (next % 65536) == 0;
@@ -834,7 +814,6 @@ public class ServerSession extends Sessi
         if(state == CLOSED)
         {
             connection.removeSession(this);
-            listener.closed(this);
         }
     }
 
@@ -1636,7 +1615,6 @@ public class ServerSession extends Sessi
                     state = CLOSED;
                     delegate.closed(this);
                     connection.removeSession(this);
-                    listener.closed(this);
                     break;
                 case CLOSED:
                     break;
@@ -1845,31 +1823,6 @@ public class ServerSession extends Sessi
         return _modelObject.getMaxUncommittedInMemorySize();
     }
 
-    static class DefaultSessionListener implements SessionListener
-    {
-
-        @Override
-        public void opened(ServerSession ssn) {}
-
-        @Override
-        public void resumed(ServerSession ssn) {}
-
-        @Override
-        public void message(ServerSession ssn, MessageTransfer xfr)
-        {
-            LOGGER.info("message: {}", xfr);
-        }
-
-        @Override
-        public void exception(ServerSession ssn, SessionException exc)
-        {
-            LOGGER.error("session exception", exc);
-        }
-
-        @Override
-        public void closed(ServerSession ssn) {}
-    }
-
     private class ResultFuture<T> implements Future<T>
     {
 
@@ -1897,7 +1850,6 @@ public class ServerSession extends Sessi
                 Waiter w = new Waiter(this, timeout);
                 while (w.hasTime() && state != CLOSED && !isDone())
                 {
-                    checkFailoverRequired("Operation was interrupted by failover.");
                     LOGGER.debug("{} waiting for result: {}", ServerSession.this, this);
                     w.await();
                 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Mar 27 11:32:37 2017
@@ -1907,7 +1907,7 @@ public class ServerSessionDelegate exten
     @Override public void executionException(ServerSession ssn, ExecutionException exc)
     {
         ssn.setException(exc);
-        ssn.getSessionListener().exception(ssn, new SessionException(exc));
+        LOGGER.error("session exception", exc);
         ssn.closed();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org