You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/03/30 23:43:45 UTC
[qpid-broker-j] 02/02: QPID-8509 - java.util.NoSuchElementException
in AMQPConnection_1_0Impl.next()
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit ea2a5ac2d5e6a5db805d41a277359dccfa06b0b9
Author: aw924 <da...@deutsche-boerse.com>
AuthorDate: Fri Mar 12 11:41:07 2021 +0100
QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()
This closes #82
---
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 129 ++++++++-------------
1 file changed, 48 insertions(+), 81 deletions(-)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8fcd9e0..d46810f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
@@ -45,6 +46,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -132,9 +134,9 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
implements DescribedTypeConstructorRegistry.Source,
- ValueWriter.Registry.Source,
- SASLEndpoint,
- AMQPConnection_1_0<AMQPConnection_1_0Impl>
+ ValueWriter.Registry.Source,
+ SASLEndpoint,
+ AMQPConnection_1_0<AMQPConnection_1_0Impl>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
@@ -200,11 +202,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
private volatile boolean _saslComplete;
@@ -504,7 +506,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
if (channel > getChannelMax())
{
Error error = new Error(ConnectionError.FRAMING_ERROR,
- String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
+ String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
handleError(error);
return;
}
@@ -534,7 +536,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
case AWAIT_OPEN:
closeReceived();
closeConnection(ConnectionError.CONNECTION_FORCED,
- "Connection close sent before connection was opened");
+ "Connection close sent before connection was opened");
break;
case OPENED:
_connectionState = ConnectionState.CLOSE_RECEIVED;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
ErrorCondition condition = error.getCondition();
Symbol errorCondition = condition == null ? null : condition.getValue();
LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
- errorCondition, close.getError().getDescription());
+ errorCondition, close.getError().getDescription());
}
sendClose(new Close());
_connectionState = ConnectionState.CLOSED;
@@ -596,15 +598,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
public boolean isClosed()
{
return _connectionState == ConnectionState.CLOSED
- || _connectionState == ConnectionState.CLOSE_RECEIVED;
+ || _connectionState == ConnectionState.CLOSE_RECEIVED;
}
@Override
public boolean isClosing()
{
return _connectionState == ConnectionState.CLOSED
- || _connectionState == ConnectionState.CLOSE_RECEIVED
- || _connectionState == ConnectionState.CLOSE_SENT;
+ || _connectionState == ConnectionState.CLOSE_RECEIVED
+ || _connectionState == ConnectionState.CLOSE_SENT;
}
@Override
@@ -702,7 +704,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
if (begin.getRemoteChannel() != null)
{
closeConnection(ConnectionError.FRAMING_ERROR,
- "BEGIN received on channel "
+ "BEGIN received on channel "
+ receivingChannelId
+ " with given remote-channel "
+ begin.getRemoteChannel()
@@ -718,17 +720,17 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
if (sendingChannelId == -1)
{
closeConnection(ConnectionError.FRAMING_ERROR,
- "BEGIN received on channel "
+ "BEGIN received on channel "
+ receivingChannelId
+ ". There are no free channels for the broker to respond on.");
}
else
{
Session_1_0 session = new Session_1_0(this,
- begin,
- sendingChannelId,
- receivingChannelId,
- getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+ begin,
+ sendingChannelId,
+ receivingChannelId,
+ getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
session.create();
_receivingSessions[receivingChannelId] = session;
@@ -754,7 +756,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
else
{
closeConnection(ConnectionError.FRAMING_ERROR,
- "BEGIN received on channel " + receivingChannelId + " which is already in use.");
+ "BEGIN received on channel " + receivingChannelId + " which is already in use.");
}
}
@@ -821,15 +823,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
int channelMax = getPort().getSessionCountLimit() - 1;
_channelMax = open.getChannelMax() == null ? channelMax
: open.getChannelMax().intValue() < channelMax
- ? open.getChannelMax().intValue()
- : channelMax;
+ ? open.getChannelMax().intValue()
+ : channelMax;
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[_channelMax + 1];
_sendingSessions = new Session_1_0[_channelMax + 1];
}
_maxFrameSize = open.getMaxFrameSize() == null
- || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
+ || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
? getBroker().getNetworkBufferSize()
: open.getMaxFrameSize().intValue();
_remoteContainerId = open.getContainerId();
@@ -884,7 +886,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
closeConnection(ConnectionError.CONNECTION_FORCED,
- "Requested idle timeout of "
+ "Requested idle timeout of "
+ _outgoingIdleTimeout
+ " is too low. The minimum supported timeout is"
+ MINIMUM_SUPPORTED_IDLE_TIMEOUT);
@@ -929,14 +931,14 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>();
for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false)
- .filter(con -> con instanceof AMQPConnection_1_0)
- .filter(con -> !con.isClosing())
- .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
- .collect(Collectors.toList()))
+ .filter(con -> con instanceof AMQPConnection_1_0)
+ .filter(con -> !con.isClosing())
+ .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
+ .collect(Collectors.toList()))
{
SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy
- != null)
+ != null)
{
soleConnectionEnforcementPolicy =
((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy;
@@ -950,9 +952,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
Error error = new Error(AmqpError.INVALID_FIELD,
- String.format(
- "Connection closed due to sole-connection-enforcement-policy '%s'",
- soleConnectionEnforcementPolicy.toString()));
+ String.format(
+ "Connection closed due to sole-connection-enforcement-policy '%s'",
+ soleConnectionEnforcementPolicy.toString()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
proceedWithRegistration = false;
@@ -961,9 +963,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
{
final Error error = new Error(AmqpError.RESOURCE_LOCKED,
- String.format(
- "Connection closed due to sole-connection-enforcement-policy '%s'",
- soleConnectionEnforcementPolicy.toString()));
+ String.format(
+ "Connection closed due to sole-connection-enforcement-policy '%s'",
+ soleConnectionEnforcementPolicy.toString()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
() -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error)));
@@ -1461,9 +1463,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
{
updateLastWriteTime();
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
- getNetwork().getRemoteAddress(),
- amqFrame.getChannel(),
- amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
+ getNetwork().getRemoteAddress(),
+ amqFrame.getChannel(),
+ amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
int size = _frameWriter.send(amqFrame);
if (size > getMaxFrameSize())
@@ -1726,10 +1728,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
if (_remoteDesiredCapabilities != null
- && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
+ && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
{
_properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
- SoleConnectionDetectionPolicy.STRONG);
+ SoleConnectionDetectionPolicy.STRONG);
}
if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
@@ -1844,46 +1846,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
@Override
public Iterator<ServerTransaction> getOpenTransactions()
{
- return new Iterator<ServerTransaction>()
- {
- int _index = 0;
-
- @Override
- public boolean hasNext()
- {
- for(int i = _index; i < _openTransactions.length; i++)
- {
- if(_openTransactions[i] != null)
- {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public ServerTransaction next()
- {
- IdentifiedTransaction txn;
- for( ; _index < _openTransactions.length; _index++)
- {
- if(_openTransactions[_index] != null)
- {
- txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
- _index++;
- return txn.getServerTransaction();
- }
- }
-
- throw new NoSuchElementException();
- }
-
- @Override
- public void remove()
- {
- _openTransactions[_index] = null;
- }
- };
+ final AtomicInteger counter = new AtomicInteger(0);
+ return Arrays.stream(_openTransactions)
+ .filter(Objects::nonNull)
+ .map(transaction -> new IdentifiedTransaction(counter.getAndIncrement(), transaction).getServerTransaction())
+ .collect(Collectors.toList()).iterator();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org