You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cm...@apache.org on 2021/03/31 00:05:22 UTC

[qpid-broker-j] branch main created (now ea2a5ac)

This is an automated email from the ASF dual-hosted git repository.

cml pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


      at ea2a5ac  QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()

This branch includes the following new commits:

     new 9e4532c  QPID-8509: [Broker-J] Add unit test for AMQPConnection_1_0Impl#getOpenTransactions()
     new ea2a5ac  QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8509: [Broker-J] Add unit test for AMQPConnection_1_0Impl#getOpenTransactions()

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cml pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 9e4532c2751a817bcd662a770062480b5cf1a3f1
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Mar 31 00:38:57 2021 +0100

    QPID-8509: [Broker-J] Add unit test for AMQPConnection_1_0Impl#getOpenTransactions()
---
 .../protocol/v1_0/AMQPConnection_1_0ImplTest.java  | 87 ++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
new file mode 100644
index 0000000..dfede56
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.SocketAddress;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class AMQPConnection_1_0ImplTest extends UnitTestBase
+{
+    private Broker<?> _broker;
+    private ServerNetworkConnection _network;
+    private AmqpPort _port;
+    private AggregateTicker _aggregateTicket;
+    private QueueManagingVirtualHost<?> _virtualHost;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _broker = BrokerTestHelper.createBrokerMock();
+        final Model model = _broker.getModel();
+        final TaskExecutor taskExecutor = _broker.getTaskExecutor();
+        _network = mock(ServerNetworkConnection.class);
+        when(_network.getLocalAddress()).thenReturn(mock(SocketAddress.class));
+        _port = mock(AmqpPort.class);
+        when(_port.getModel()).thenReturn(model);
+        when(_port.getTaskExecutor()).thenReturn(taskExecutor);
+        when(_port.getChildExecutor()).thenReturn(taskExecutor);
+        _aggregateTicket = mock(AggregateTicker.class);
+        _virtualHost = BrokerTestHelper.createVirtualHost("test", _broker, true, this);
+    }
+
+    @Test
+    public void testGetOpenTransactions()
+    {
+        final AMQPConnection_1_0Impl connection = new AMQPConnection_1_0Impl(_broker, _network, _port, Transport.TCP, 0, _aggregateTicket);
+        connection.setAddressSpace(_virtualHost);
+        final IdentifiedTransaction tx1 = connection.createIdentifiedTransaction();
+        final IdentifiedTransaction tx2 = connection.createIdentifiedTransaction();
+
+        final Iterator<ServerTransaction> iterator = connection.getOpenTransactions();
+
+        assertThat(iterator.hasNext(), is(true));
+        assertThat(iterator.next(), is(equalTo(tx1.getServerTransaction())));
+        assertThat(iterator.hasNext(), is(true));
+        assertThat(iterator.next(), is(equalTo(tx2.getServerTransaction())));
+        assertThat(iterator.hasNext(), is(false));
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cml pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit ea2a5ac2d5e6a5db805d41a277359dccfa06b0b9
Author: aw924 <da...@deutsche-boerse.com>
AuthorDate: Fri Mar 12 11:41:07 2021 +0100

    QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next()
    
    This closes #82
---
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      | 129 ++++++++-------------
 1 file changed, 48 insertions(+), 81 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8fcd9e0..d46810f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
@@ -45,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -132,9 +134,9 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
         implements DescribedTypeConstructorRegistry.Source,
-                   ValueWriter.Registry.Source,
-                   SASLEndpoint,
-                   AMQPConnection_1_0<AMQPConnection_1_0Impl>
+        ValueWriter.Registry.Source,
+        SASLEndpoint,
+        AMQPConnection_1_0<AMQPConnection_1_0Impl>
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
@@ -200,11 +202,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
 
     private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
-                                                                                              .registerTransportLayer()
-                                                                                              .registerMessagingLayer()
-                                                                                              .registerTransactionLayer()
-                                                                                              .registerSecurityLayer()
-                                                                                              .registerExtensionSoleconnLayer();
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer()
+            .registerExtensionSoleconnLayer();
 
     private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
     private volatile boolean _saslComplete;
@@ -504,7 +506,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (channel > getChannelMax())
         {
             Error error = new Error(ConnectionError.FRAMING_ERROR,
-                                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
+                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
             handleError(error);
             return;
         }
@@ -534,7 +536,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
             case AWAIT_OPEN:
                 closeReceived();
                 closeConnection(ConnectionError.CONNECTION_FORCED,
-                                "Connection close sent before connection was opened");
+                        "Connection close sent before connection was opened");
                 break;
             case OPENED:
                 _connectionState = ConnectionState.CLOSE_RECEIVED;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                     ErrorCondition condition = error.getCondition();
                     Symbol errorCondition = condition == null ? null : condition.getValue();
                     LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
-                                errorCondition, close.getError().getDescription());
+                            errorCondition, close.getError().getDescription());
                 }
                 sendClose(new Close());
                 _connectionState = ConnectionState.CLOSED;
@@ -596,15 +598,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     public boolean isClosed()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED;
     }
 
     @Override
     public boolean isClosing()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED
-               || _connectionState == ConnectionState.CLOSE_SENT;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED
+                || _connectionState == ConnectionState.CLOSE_SENT;
     }
 
     @Override
@@ -702,7 +704,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (begin.getRemoteChannel() != null)
         {
             closeConnection(ConnectionError.FRAMING_ERROR,
-                            "BEGIN received on channel "
+                    "BEGIN received on channel "
                             + receivingChannelId
                             + " with given remote-channel "
                             + begin.getRemoteChannel()
@@ -718,17 +720,17 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                 if (sendingChannelId == -1)
                 {
                     closeConnection(ConnectionError.FRAMING_ERROR,
-                                    "BEGIN received on channel "
+                            "BEGIN received on channel "
                                     + receivingChannelId
                                     + ". There are no free channels for the broker to respond on.");
                 }
                 else
                 {
                     Session_1_0 session = new Session_1_0(this,
-                                                          begin,
-                                                          sendingChannelId,
-                                                          receivingChannelId,
-                                                          getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+                            begin,
+                            sendingChannelId,
+                            receivingChannelId,
+                            getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                     session.create();
 
                     _receivingSessions[receivingChannelId] = session;
@@ -754,7 +756,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
             else
             {
                 closeConnection(ConnectionError.FRAMING_ERROR,
-                                "BEGIN received on channel " + receivingChannelId + " which is already in use.");
+                        "BEGIN received on channel " + receivingChannelId + " which is already in use.");
             }
 
         }
@@ -821,15 +823,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         int channelMax = getPort().getSessionCountLimit() - 1;
         _channelMax = open.getChannelMax() == null ? channelMax
                 : open.getChannelMax().intValue() < channelMax
-                        ? open.getChannelMax().intValue()
-                        : channelMax;
+                ? open.getChannelMax().intValue()
+                : channelMax;
         if (_receivingSessions == null)
         {
             _receivingSessions = new Session_1_0[_channelMax + 1];
             _sendingSessions = new Session_1_0[_channelMax + 1];
         }
         _maxFrameSize = open.getMaxFrameSize() == null
-                        || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
+                || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
                 ? getBroker().getNetworkBufferSize()
                 : open.getMaxFrameSize().intValue();
         _remoteContainerId = open.getContainerId();
@@ -884,7 +886,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
             closeConnection(ConnectionError.CONNECTION_FORCED,
-                            "Requested idle timeout of "
+                    "Requested idle timeout of "
                             + _outgoingIdleTimeout
                             + " is too low. The minimum supported timeout is"
                             + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
@@ -929,14 +931,14 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                         {
                             List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>();
                             for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false)
-                                                                                     .filter(con -> con instanceof AMQPConnection_1_0)
-                                                                                     .filter(con -> !con.isClosing())
-                                                                                     .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
-                                                                                     .collect(Collectors.toList()))
+                                    .filter(con -> con instanceof AMQPConnection_1_0)
+                                    .filter(con -> !con.isClosing())
+                                    .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
+                                    .collect(Collectors.toList()))
                             {
                                 SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
                                 if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy
-                                    != null)
+                                        != null)
                                 {
                                     soleConnectionEnforcementPolicy =
                                             ((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy;
@@ -950,9 +952,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                 {
                                     _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
                                     Error error = new Error(AmqpError.INVALID_FIELD,
-                                                            String.format(
-                                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                    soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
+                                                    soleConnectionEnforcementPolicy.toString()));
                                     error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
                                     newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
                                     proceedWithRegistration = false;
@@ -961,9 +963,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                 else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
                                 {
                                     final Error error = new Error(AmqpError.RESOURCE_LOCKED,
-                                                                  String.format(
-                                                                          "Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                          soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
+                                                    soleConnectionEnforcementPolicy.toString()));
                                     error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
                                     rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
                                             () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error)));
@@ -1461,9 +1463,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     {
         updateLastWriteTime();
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
-                           getNetwork().getRemoteAddress(),
-                           amqFrame.getChannel(),
-                           amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
+                getNetwork().getRemoteAddress(),
+                amqFrame.getChannel(),
+                amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
 
         int size = _frameWriter.send(amqFrame);
         if (size > getMaxFrameSize())
@@ -1726,10 +1728,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         }
 
         if (_remoteDesiredCapabilities != null
-            && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
+                && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
         {
             _properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
-                            SoleConnectionDetectionPolicy.STRONG);
+                    SoleConnectionDetectionPolicy.STRONG);
         }
 
         if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
@@ -1844,46 +1846,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     @Override
     public Iterator<ServerTransaction> getOpenTransactions()
     {
-        return new Iterator<ServerTransaction>()
-        {
-            int _index = 0;
-
-            @Override
-            public boolean hasNext()
-            {
-                for(int i = _index; i < _openTransactions.length; i++)
-                {
-                    if(_openTransactions[i] != null)
-                    {
-                        return true;
-                    }
-                }
-                return false;
-            }
-
-            @Override
-            public ServerTransaction next()
-            {
-                IdentifiedTransaction txn;
-                for( ; _index < _openTransactions.length; _index++)
-                {
-                    if(_openTransactions[_index] != null)
-                    {
-                        txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
-                        _index++;
-                        return txn.getServerTransaction();
-                    }
-                }
-
-                throw new NoSuchElementException();
-            }
-
-            @Override
-            public void remove()
-            {
-                _openTransactions[_index] = null;
-            }
-        };
+        final AtomicInteger counter = new AtomicInteger(0);
+        return Arrays.stream(_openTransactions)
+                .filter(Objects::nonNull)
+                .map(transaction -> new IdentifiedTransaction(counter.getAndIncrement(), transaction).getServerTransaction())
+                .collect(Collectors.toList()).iterator();
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org