You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/03/30 23:43:45 UTC

[qpid-broker-j] 02/02: QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit ea2a5ac2d5e6a5db805d41a277359dccfa06b0b9
Author: aw924 <da...@deutsche-boerse.com>
AuthorDate: Fri Mar 12 11:41:07 2021 +0100

    QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()
    
    This closes #82
---
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      | 129 ++++++++-------------
 1 file changed, 48 insertions(+), 81 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8fcd9e0..d46810f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
@@ -45,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -132,9 +134,9 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
         implements DescribedTypeConstructorRegistry.Source,
-                   ValueWriter.Registry.Source,
-                   SASLEndpoint,
-                   AMQPConnection_1_0<AMQPConnection_1_0Impl>
+        ValueWriter.Registry.Source,
+        SASLEndpoint,
+        AMQPConnection_1_0<AMQPConnection_1_0Impl>
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
@@ -200,11 +202,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
 
     private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
-                                                                                              .registerTransportLayer()
-                                                                                              .registerMessagingLayer()
-                                                                                              .registerTransactionLayer()
-                                                                                              .registerSecurityLayer()
-                                                                                              .registerExtensionSoleconnLayer();
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer()
+            .registerExtensionSoleconnLayer();
 
     private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
     private volatile boolean _saslComplete;
@@ -504,7 +506,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (channel > getChannelMax())
         {
             Error error = new Error(ConnectionError.FRAMING_ERROR,
-                                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
+                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
             handleError(error);
             return;
         }
@@ -534,7 +536,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
             case AWAIT_OPEN:
                 closeReceived();
                 closeConnection(ConnectionError.CONNECTION_FORCED,
-                                "Connection close sent before connection was opened");
+                        "Connection close sent before connection was opened");
                 break;
             case OPENED:
                 _connectionState = ConnectionState.CLOSE_RECEIVED;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                     ErrorCondition condition = error.getCondition();
                     Symbol errorCondition = condition == null ? null : condition.getValue();
                     LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
-                                errorCondition, close.getError().getDescription());
+                            errorCondition, close.getError().getDescription());
                 }
                 sendClose(new Close());
                 _connectionState = ConnectionState.CLOSED;
@@ -596,15 +598,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     public boolean isClosed()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED;
     }
 
     @Override
     public boolean isClosing()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED
-               || _connectionState == ConnectionState.CLOSE_SENT;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED
+                || _connectionState == ConnectionState.CLOSE_SENT;
     }
 
     @Override
@@ -702,7 +704,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (begin.getRemoteChannel() != null)
         {
             closeConnection(ConnectionError.FRAMING_ERROR,
-                            "BEGIN received on channel "
+                    "BEGIN received on channel "
                             + receivingChannelId
                             + " with given remote-channel "
                             + begin.getRemoteChannel()
@@ -718,17 +720,17 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                 if (sendingChannelId == -1)
                 {
                     closeConnection(ConnectionError.FRAMING_ERROR,
-                                    "BEGIN received on channel "
+                            "BEGIN received on channel "
                                     + receivingChannelId
                                     + ". There are no free channels for the broker to respond on.");
                 }
                 else
                 {
                     Session_1_0 session = new Session_1_0(this,
-                                                          begin,
-                                                          sendingChannelId,
-                                                          receivingChannelId,
-                                                          getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+                            begin,
+                            sendingChannelId,
+                            receivingChannelId,
+                            getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                     session.create();
 
                     _receivingSessions[receivingChannelId] = session;
@@ -754,7 +756,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
             else
             {
                 closeConnection(ConnectionError.FRAMING_ERROR,
-                                "BEGIN received on channel " + receivingChannelId + " which is already in use.");
+                        "BEGIN received on channel " + receivingChannelId + " which is already in use.");
             }
 
         }
@@ -821,15 +823,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         int channelMax = getPort().getSessionCountLimit() - 1;
         _channelMax = open.getChannelMax() == null ? channelMax
                 : open.getChannelMax().intValue() < channelMax
-                        ? open.getChannelMax().intValue()
-                        : channelMax;
+                ? open.getChannelMax().intValue()
+                : channelMax;
         if (_receivingSessions == null)
         {
             _receivingSessions = new Session_1_0[_channelMax + 1];
             _sendingSessions = new Session_1_0[_channelMax + 1];
         }
         _maxFrameSize = open.getMaxFrameSize() == null
-                        || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
+                || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
                 ? getBroker().getNetworkBufferSize()
                 : open.getMaxFrameSize().intValue();
         _remoteContainerId = open.getContainerId();
@@ -884,7 +886,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
             closeConnection(ConnectionError.CONNECTION_FORCED,
-                            "Requested idle timeout of "
+                    "Requested idle timeout of "
                             + _outgoingIdleTimeout
                             + " is too low. The minimum supported timeout is"
                             + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
@@ -929,14 +931,14 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                         {
                             List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>();
                             for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false)
-                                                                                     .filter(con -> con instanceof AMQPConnection_1_0)
-                                                                                     .filter(con -> !con.isClosing())
-                                                                                     .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
-                                                                                     .collect(Collectors.toList()))
+                                    .filter(con -> con instanceof AMQPConnection_1_0)
+                                    .filter(con -> !con.isClosing())
+                                    .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
+                                    .collect(Collectors.toList()))
                             {
                                 SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
                                 if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy
-                                    != null)
+                                        != null)
                                 {
                                     soleConnectionEnforcementPolicy =
                                             ((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy;
@@ -950,9 +952,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                 {
                                     _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
                                     Error error = new Error(AmqpError.INVALID_FIELD,
-                                                            String.format(
-                                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                    soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
+                                                    soleConnectionEnforcementPolicy.toString()));
                                     error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
                                     newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
                                     proceedWithRegistration = false;
@@ -961,9 +963,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                 else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
                                 {
                                     final Error error = new Error(AmqpError.RESOURCE_LOCKED,
-                                                                  String.format(
-                                                                          "Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                          soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
+                                                    soleConnectionEnforcementPolicy.toString()));
                                     error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
                                     rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
                                             () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error)));
@@ -1461,9 +1463,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     {
         updateLastWriteTime();
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
-                           getNetwork().getRemoteAddress(),
-                           amqFrame.getChannel(),
-                           amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
+                getNetwork().getRemoteAddress(),
+                amqFrame.getChannel(),
+                amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
 
         int size = _frameWriter.send(amqFrame);
         if (size > getMaxFrameSize())
@@ -1726,10 +1728,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         }
 
         if (_remoteDesiredCapabilities != null
-            && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
+                && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
         {
             _properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
-                            SoleConnectionDetectionPolicy.STRONG);
+                    SoleConnectionDetectionPolicy.STRONG);
         }
 
         if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
@@ -1844,46 +1846,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     @Override
     public Iterator<ServerTransaction> getOpenTransactions()
     {
-        return new Iterator<ServerTransaction>()
-        {
-            int _index = 0;
-
-            @Override
-            public boolean hasNext()
-            {
-                for(int i = _index; i < _openTransactions.length; i++)
-                {
-                    if(_openTransactions[i] != null)
-                    {
-                        return true;
-                    }
-                }
-                return false;
-            }
-
-            @Override
-            public ServerTransaction next()
-            {
-                IdentifiedTransaction txn;
-                for( ; _index < _openTransactions.length; _index++)
-                {
-                    if(_openTransactions[_index] != null)
-                    {
-                        txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
-                        _index++;
-                        return txn.getServerTransaction();
-                    }
-                }
-
-                throw new NoSuchElementException();
-            }
-
-            @Override
-            public void remove()
-            {
-                _openTransactions[_index] = null;
-            }
-        };
+        final AtomicInteger counter = new AtomicInteger(0);
+        return Arrays.stream(_openTransactions)
+                .filter(Objects::nonNull)
+                .map(transaction -> new IdentifiedTransaction(counter.getAndIncrement(), transaction).getServerTransaction())
+                .collect(Collectors.toList()).iterator();
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org