You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/03/27 11:32:38 UTC
svn commit: r1788900 -
/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
Author: orudyy
Date: Mon Mar 27 11:32:37 2017
New Revision: 1788900
URL: http://svn.apache.org/viewvc?rev=1788900&view=rev
Log:
QPID-7622: Tidy up 0-10 after separation
Removed:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Mon Mar 27 11:32:37 2017
@@ -89,7 +89,6 @@ public class AMQPConnection_0_10Impl ext
_connection.setConnectionDelegate(connDelegate);
_connection.setRemoteAddress(network.getRemoteAddress());
- _connection.setLocalAddress(network.getLocalAddress());
_inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
_connection.addFrameSizeObserver(_inputHandler);
@@ -327,7 +326,7 @@ public class AMQPConnection_0_10Impl ext
public String getRemoteContainerName()
{
- return _connection.getRemoteContainerName();
+ return getClientId();
}
public Collection<? extends Session_0_10> getSessionModels()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 27 11:32:37 2017
@@ -102,7 +102,6 @@ public class ServerConnection extends Co
private int channelMax = 1;
private String locale;
private SocketAddress _remoteAddress;
- private SocketAddress _localAddress;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -409,9 +408,8 @@ public class ServerConnection extends Co
{
for (ServerSession ssn : getChannels())
{
- final ServerSession session = (ServerSession) ssn;
- ((ServerSession) ssn).setClose(true);
- session.closed();
+ ssn.setClose(true);
+ ssn.closed();
}
}
@@ -419,7 +417,7 @@ public class ServerConnection extends Co
{
for (ServerSession ssn : getChannels())
{
- ((ServerSession)ssn).receivedComplete();
+ ssn.receivedComplete();
}
}
@@ -439,12 +437,6 @@ public class ServerConnection extends Co
}
- public String getRemoteContainerName()
- {
- return getConnectionDelegate().getClientId();
- }
-
-
public long getSessionCountLimit()
{
return getChannelMax();
@@ -668,21 +660,11 @@ public class ServerConnection extends Co
return _remoteAddress;
}
- public SocketAddress getLocalAddress()
- {
- return _localAddress;
- }
-
protected void setRemoteAddress(SocketAddress remoteAddress)
{
_remoteAddress = remoteAddress;
}
- protected void setLocalAddress(SocketAddress localAddress)
- {
- _localAddress = localAddress;
- }
-
private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
{
SessionDetached sessionDetached = new SessionDetached();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 27 11:32:37 2017
@@ -101,21 +101,25 @@ public class ServerConnectionDelegate ex
_maximumFrameSize = Math.min(0xffff, broker.getNetworkBufferSize());
}
+ @Override
public void control(ServerConnection conn, Method method)
{
method.dispatch(conn, this);
}
+ @Override
public void command(ServerConnection conn, Method method)
{
method.dispatch(conn, this);
}
+ @Override
public void error(ServerConnection conn, ProtocolError error)
{
conn.exception(new ConnectionException(error.getMessage()));
}
+ @Override
public void handle(ServerConnection conn, Method method)
{
conn.dispatch(method);
@@ -148,13 +152,6 @@ public class ServerConnectionDelegate ex
}
}
- public void writerIdle(final ServerConnection connection)
- {
- connection.doHeartBeat();
- }
-
-
-
public final ConnectionState getState()
{
return _state;
@@ -172,9 +169,8 @@ public class ServerConnectionDelegate ex
}
@Override
- public void init(final ServerConnection conn, final ProtocolHeader hdr)
+ public void init(final ServerConnection serverConnection, final ProtocolHeader hdr)
{
- ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.INIT);
serverConnection.send(new ProtocolHeader(1, 0, 10));
serverConnection.sendConnectionStart(_clientProperties, _mechanisms, _locales);
@@ -216,9 +212,8 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionSecureOk(final ServerConnection conn, final ConnectionSecureOk ok)
+ public void connectionSecureOk(final ServerConnection serverConnection, final ConnectionSecureOk ok)
{
- ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
secure(serverConnection, ok.getResponse());
}
@@ -261,18 +256,16 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionClose(ServerConnection conn, ConnectionClose close)
+ public void connectionClose(ServerConnection sconn, ConnectionClose close)
{
- final ServerConnection sconn = (ServerConnection) conn;
sconn.closeCode(close);
sconn.setState(CLOSE_RCVD);
- sendConnectionCloseOkAndCloseSender(conn);
+ sendConnectionCloseOkAndCloseSender(sconn);
}
-
- public void connectionOpen(ServerConnection conn, ConnectionOpen open)
+ @Override
+ public void connectionOpen(ServerConnection sconn, ConnectionOpen open)
{
- final ServerConnection sconn = (ServerConnection) conn;
assertState(sconn, ConnectionState.AWAIT_OPEN);
NamedAddressSpace addressSpace;
String vhostName;
@@ -285,11 +278,9 @@ public class ServerConnectionDelegate ex
vhostName = "";
}
- AmqpPort port = (AmqpPort) sconn.getPort();
+ AmqpPort port = sconn.getPort();
addressSpace = port.getAddressSpace(vhostName);
-
-
if(addressSpace != null)
{
if (!addressSpace.isActive())
@@ -339,9 +330,8 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionTuneOk(final ServerConnection conn, final ConnectionTuneOk ok)
+ public void connectionTuneOk(final ServerConnection sconn, final ConnectionTuneOk ok)
{
- ServerConnection sconn = (ServerConnection) conn;
assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
int okChannelMax = ok.getChannelMax();
int okMaxFrameSize = ok.getMaxFrameSize();
@@ -421,7 +411,7 @@ public class ServerConnectionDelegate ex
private void stopAllSubscriptions(ServerConnection conn, SessionDetach dtc)
{
- final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
+ final ServerSession ssn = conn.getSession(dtc.getChannel());
final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
@@ -431,21 +421,20 @@ public class ServerConnectionDelegate ex
@Override
- public void sessionAttach(final ServerConnection conn, final SessionAttach atc)
+ public void sessionAttach(final ServerConnection serverConnection, final SessionAttach atc)
{
- ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.OPEN);
ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
final ServerSession serverSession =
- new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
- final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), atc.getChannel(),
+ new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
+ final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
serverSession);
session.create();
serverSession.setModelObject(session);
- if(isSessionNameUnique(atc.getName(), conn))
+ if(isSessionNameUnique(atc.getName(), serverConnection))
{
serverConnection.map(serverSession, atc.getChannel());
serverConnection.registerSession(serverSession);
@@ -461,12 +450,11 @@ public class ServerConnectionDelegate ex
private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
{
- final ServerConnection sconn = (ServerConnection) conn;
- final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+ final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
- ((ServerConnection)conn).getAddressSpace().getConnections().iterator();
+ conn.getAddressSpace().getConnections().iterator();
while(connections.hasNext())
{
final AMQPConnection<?> amqConnectionModel = (AMQPConnection<?>) connections.next();
@@ -483,9 +471,8 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionStartOk(ServerConnection conn, ConnectionStartOk ok)
+ public void connectionStartOk(ServerConnection serverConnection, ConnectionStartOk ok)
{
- ServerConnection serverConnection = (ServerConnection)conn;
assertState(serverConnection, ConnectionState.AWAIT_START_OK);
_clientProperties = ok.getClientProperties();
if(_clientProperties != null)
@@ -533,30 +520,6 @@ public class ServerConnectionDelegate ex
return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
}
- public Map<String,Object> getClientProperties()
- {
- return _clientProperties;
- }
-
- public String getClientId()
- {
- return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.CLIENT_ID_0_10);
- }
-
- public String getClientVersion()
- {
- return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10);
- }
-
- public String getClientProduct()
- {
- return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
- }
-
- public String getRemoteProcessPid()
- {
- return (_clientProperties == null || _clientProperties.get(ConnectionStartProperties.PID) == null) ? null : String.valueOf(_clientProperties.get(ConnectionStartProperties.PID));
- }
protected int getHeartbeatMax()
{
@@ -569,9 +532,8 @@ public class ServerConnectionDelegate ex
return _compressionSupported && _broker.isMessageCompressionEnabled();
}
- private void connectionAuthFailed(final ServerConnection conn, Exception e)
+ private void connectionAuthFailed(final ServerConnection serverConnection, Exception e)
{
- ServerConnection serverConnection = (ServerConnection)conn;
if (e != null)
{
serverConnection.exception(e);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Mon Mar 27 11:32:37 2017
@@ -24,7 +24,6 @@ import static org.apache.qpid.server.tra
import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Mar 27 11:32:37 2017
@@ -112,7 +112,6 @@ public class ServerSession extends Sessi
private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
private final Object commandsLock = new Object();
private final Object stateLock = new Object();
- private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
private Session_0_10 _modelObject;
private long _blockTime;
private long _blockingTimeout;
@@ -122,7 +121,6 @@ public class ServerSession extends Sessi
private boolean closing;
private int channel;
private ServerSessionDelegate delegate;
- private SessionListener listener = new DefaultSessionListener();
private boolean incomingInit;
// incoming command count
private int commandsIn;
@@ -172,9 +170,7 @@ public class ServerSession extends Sessi
initReceiver();
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
- ServerConnection serverConnection = (ServerConnection) connection;
-
- _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+ _blockingTimeout = connection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
}
public Binary getName()
@@ -197,11 +193,6 @@ public class ServerSession extends Sessi
this.channel = channel;
}
- public SessionListener getSessionListener()
- {
- return listener;
- }
-
protected State getState()
{
return this.state;
@@ -235,8 +226,6 @@ public class ServerSession extends Sessi
void resume()
{
- _failoverRequired.set(false);
-
synchronized (commandsLock)
{
attach();
@@ -293,7 +282,6 @@ public class ServerSession extends Sessi
txSelect();
}
- listener.resumed(this);
resumer = null;
}
}
@@ -701,14 +689,6 @@ public class ServerSession extends Sessi
}
}
- private void checkFailoverRequired(String message)
- {
- if (_failoverRequired.get())
- {
- throw new SessionException(message);
- }
- }
-
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -834,7 +814,6 @@ public class ServerSession extends Sessi
if(state == CLOSED)
{
connection.removeSession(this);
- listener.closed(this);
}
}
@@ -1636,7 +1615,6 @@ public class ServerSession extends Sessi
state = CLOSED;
delegate.closed(this);
connection.removeSession(this);
- listener.closed(this);
break;
case CLOSED:
break;
@@ -1845,31 +1823,6 @@ public class ServerSession extends Sessi
return _modelObject.getMaxUncommittedInMemorySize();
}
- static class DefaultSessionListener implements SessionListener
- {
-
- @Override
- public void opened(ServerSession ssn) {}
-
- @Override
- public void resumed(ServerSession ssn) {}
-
- @Override
- public void message(ServerSession ssn, MessageTransfer xfr)
- {
- LOGGER.info("message: {}", xfr);
- }
-
- @Override
- public void exception(ServerSession ssn, SessionException exc)
- {
- LOGGER.error("session exception", exc);
- }
-
- @Override
- public void closed(ServerSession ssn) {}
- }
-
private class ResultFuture<T> implements Future<T>
{
@@ -1897,7 +1850,6 @@ public class ServerSession extends Sessi
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
- checkFailoverRequired("Operation was interrupted by failover.");
LOGGER.debug("{} waiting for result: {}", ServerSession.this, this);
w.await();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1788900&r1=1788899&r2=1788900&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Mar 27 11:32:37 2017
@@ -1907,7 +1907,7 @@ public class ServerSessionDelegate exten
@Override public void executionException(ServerSession ssn, ExecutionException exc)
{
ssn.setException(exc);
- ssn.getSessionListener().exception(ssn, new SessionException(exc));
+ LOGGER.error("session exception", exc);
ssn.closed();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org