You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/07/01 16:03:55 UTC

[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3633: ARTEMIS-3243 Improve Mirror with dual mirror

gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r662334845



##########
File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IDSupplier.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import io.netty.util.collection.LongObjectHashMap;
+
+/** This interface is meant to encapsulate a HashMap(ListID, LongObjectHashMap(ElementType)) .
+ * (notice I am using parenthesis instead of &lt; and &gt; to make it easier to read on the source code)

Review comment:
       You can use tags like e.g @code and @literal to avoid needing escapes or explainers.

##########
File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/IDSupplier.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import io.netty.util.collection.LongObjectHashMap;
+
+/** This interface is meant to encapsulate a HashMap(ListID, LongObjectHashMap(ElementType)) .
+ * (notice I am using parenthesis instead of &lt; and &gt; to make it easier to read on the source code)
+ *
+ *   ListID should be translated as ServerID when in use by ActiveMQ Artemis. the ListID will probably be a server UUID.
+ *
+ *  The implementation should always provide the same instance of a list for a ListID. */
+public interface IDSupplier<E> {
+   LongObjectHashMap<LinkedListImpl.Node<E>> getList(Object ListID);

Review comment:
       Perhaps getMap? getNodes?

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -239,13 +225,14 @@ protected void actualDelivery(AMQPMessage message, Delivery delivery, Receiver r
                deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
             } else if (eventType.equals(POST_ACK)) {
                String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
+               String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
                String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
                AmqpValue value = (AmqpValue) message.getBody();
                Long messageID = (Long) value.getValue();
                if (logger.isDebugEnabled()) {
                   logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + "(mirrorID=" + ByteUtil.getFirstByte(messageID) + ", messageID=" + ByteUtil.removeFirstByte(messageID) + ")");
                }
-               if (postAcknowledge(address, queueName, messageID, messageAckOperation)) {
+               if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation)) {

Review comment:
       Various debug logging here and elsewhere splitting the long body / message-id value into components that needs updated or removed.

##########
File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -92,9 +86,11 @@ public void setIDSupplier(ToLongFunction<E> supplier) {
    }
 
    private void putID(E value, Node<E> position) {
-      long id = idSupplier.applyAsLong(value);
-      if (id >= 0) {
-         nodeMap.put(id, position);
+      Object listID = idSupplier.getListID(value);
+      LongObjectHashMap<Node<E>> nodesForList = idSupplier.getList(listID);
+      long theID = idSupplier.getID(value);
+      if (nodesForList != null) {
+         nodesForList.put(theID, position);

Review comment:
       pre-existing code, but maybe 'node' instead of 'position' might read a little better? Woudl be consistent with use in removeWithID also.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -387,7 +387,7 @@ public void testNoAddressWithAnnotations() throws Exception {
       server.start();
       server_2 = createServer(AMQP_PORT_2, false);
       server_2.setIdentity("server_2");
-      server_2.getConfiguration().setBrokerMirrorId(2);
+      server_2.getConfiguration();

Review comment:
       can this line be deleted now? (and others on lines 694, 702)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
##########
@@ -129,11 +129,91 @@ public void testSyncOnCreateQueues() throws Exception {
       server.stop();
    }
 
+
+   @Test
+   public void testSingleMessage() throws Exception {
+      server.setIdentity("Server1");
+      server.getConfiguration();
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+      server.start();
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("Server2");
+      server_2.getConfiguration();

Review comment:
       can this line be deleted now? (and others on lines 216, 226, 351, 355)

##########
File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
##########
@@ -20,18 +20,17 @@
 import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.function.ToLongFunction;
 
 /**
  * A priority linked list implementation
  * <p>
  * It implements this by maintaining an individual LinkedBlockingDeque for each priority level.
  */
-public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
+public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {

Review comment:
       Would have been nicer to review actual changes if the type holder was renamed separately (not suggesting you change it back).

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -356,24 +343,29 @@ public boolean postAcknowledge(String address, String queue, long messageID, ACK
          logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
       }
 
-      performAck(messageID, targetQueue, ackMessage, true);
+      performAck(nodeID, messageID, targetQueue, ackMessage, true);
       return true;
 
    }
 
-   private void performAck(long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
+   private void performAck(Object serverID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
       if (logger.isTraceEnabled()) {
          logger.trace("performAck " + messageID + "(messageID=" + ByteUtil.removeFirstByte(messageID) + "), targetQueue=" + targetQueue.getName());
       }
-      MessageReference reference = targetQueue.removeWithSuppliedID(messageID, referenceIDSupplier);
+      MessageReference reference = targetQueue.removeWithSuppliedID(serverID, messageID, referenceIDSupplier);
       if (reference == null && retry) {
 
+         targetQueue.forEach((r) -> {
+            System.out.println("reference = " + r);
+            System.out.println("nodeID = " + referenceIDSupplier.getListID(r) + " and remoteMessageID = " + referenceIDSupplier.getID(r));
+         });
+

Review comment:
       Leftover debug print.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org