You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/06 13:53:31 UTC

[activemq-artemis] branch master updated: ARTEMIS-2327 Removing Bridge Test after fix

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c4238e1  ARTEMIS-2327 Removing Bridge Test after fix
c4238e1 is described below

commit c4238e154fba199060e9870d0ddc9b667eda5179
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon May 6 09:52:07 2019 -0400

    ARTEMIS-2327 Removing Bridge Test after fix
    
    This test was playing with an ignore packet, which does not make any more sense
    after the last change.
    
    After a packet loss the bridge will reconnect, and this test makes no more sense.
---
 .../integration/cluster/bridge/BridgeTest.java     | 168 +--------------------
 1 file changed, 3 insertions(+), 165 deletions(-)

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index c46fd01..e9d8a21 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -62,25 +61,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
-import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
-import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -318,7 +314,6 @@ public class BridgeTest extends ActiveMQTestBase {
       ClientSession session1 = sf1.createSession(true, true, 0);
       ClientConsumer consumer1 = session1.createConsumer(queueName1);
 
-
       session1.start();
 
       final byte[] bytes = new byte[messageSize];
@@ -351,7 +346,7 @@ public class BridgeTest extends ActiveMQTestBase {
       }
       session1.commit();
 
-      BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
+      BridgeImpl bridge = (BridgeImpl) server0.getClusterManager().getBridges().get("bridge1");
 
       // stop in the middle. wait the bridge to block
       Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl);
@@ -379,7 +374,6 @@ public class BridgeTest extends ActiveMQTestBase {
          message.acknowledge();
       }
 
-
       Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
 
       Assert.assertNull(consumer1.receiveImmediate());
@@ -514,160 +508,6 @@ public class BridgeTest extends ActiveMQTestBase {
       }
    }
 
-   @Test
-   public void testLostMessageSimpleMessage() throws Exception {
-      internalTestMessageLoss(false);
-   }
-
-   @Test
-   public void testLostMessageLargeMessage() throws Exception {
-      internalTestMessageLoss(true);
-   }
-
-   /**
-    * This test will ignore messages
-    * What will cause the bridge to fail with a timeout
-    * The bridge should still recover the failure and reconnect on that case
-    */
-   public void internalTestMessageLoss(final boolean largeMessage) throws Exception {
-      class MyInterceptor implements Interceptor {
-
-         public boolean ignoreSends = true;
-         public CountDownLatch latch;
-
-         MyInterceptor(int numberOfIgnores) {
-            latch = new CountDownLatch(numberOfIgnores);
-         }
-
-         @Override
-         public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
-            if (ignoreSends && packet instanceof SessionSendMessage ||
-               ignoreSends && packet instanceof SessionSendLargeMessage ||
-               ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage) packet).isContinues()) {
-               IntegrationTestLogger.LOGGER.info("IGNORED: " + packet);
-               latch.countDown();
-               return false;
-            } else {
-               IntegrationTestLogger.LOGGER.info(packet);
-               return true;
-            }
-         }
-
-      }
-
-      MyInterceptor myInterceptor = new MyInterceptor(3);
-
-      Map<String, Object> server0Params = new HashMap<>();
-      server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
-
-      Map<String, Object> server1Params = new HashMap<>();
-      addTargetParameters(server1Params);
-      server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
-
-      final String testAddress = "testAddress";
-      final String queueName0 = "queue0";
-      final String forwardAddress = "forwardAddress";
-      final String queueName1 = "queue1";
-
-      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
-      TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
-
-      HashMap<String, TransportConfiguration> connectors = new HashMap<>();
-      connectors.put(server1tc.getName(), server1tc);
-      server0.getConfiguration().setConnectorConfigurations(connectors);
-
-      final int messageSize = 1024;
-
-      final int numMessages = 1;
-
-      ArrayList<String> connectorConfig = new ArrayList<>();
-      connectorConfig.add(server1tc.getName());
-      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setCallTimeout(500);
-
-      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
-      bridgeConfigs.add(bridgeConfiguration);
-      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
-
-      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
-      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
-      queueConfigs0.add(queueConfig0);
-      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
-
-      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
-      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
-      queueConfigs1.add(queueConfig1);
-      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
-
-      server1.start();
-
-      server1.getRemotingService().addIncomingInterceptor(myInterceptor);
-
-      server0.start();
-      locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
-      ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
-
-      ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
-
-      ClientSession session0 = sf0.createSession(false, true, true);
-
-      ClientSession session1 = sf1.createSession(false, true, true);
-
-      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
-
-      ClientConsumer consumer1 = session1.createConsumer(queueName1);
-
-      session1.start();
-
-      final byte[] bytes = new byte[messageSize];
-
-      final SimpleString propKey = new SimpleString("testkey");
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session0.createMessage(true);
-
-         if (largeMessage) {
-            message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
-         }
-
-         message.putIntProperty(propKey, i);
-
-         message.getBodyBuffer().writeBytes(bytes);
-
-         producer0.send(message);
-      }
-
-      assertTrue("where is the countDown?", myInterceptor.latch.await(30, TimeUnit.SECONDS));
-      myInterceptor.ignoreSends = false;
-      server1.getRemotingService().removeIncomingInterceptor(myInterceptor);
-      IntegrationTestLogger.LOGGER.info("No longer ignoring packets.");
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer1.receive(30000);
-
-         Assert.assertNotNull(message);
-
-         Assert.assertEquals(i, message.getObjectProperty(propKey));
-
-         if (largeMessage) {
-            readLargeMessages(message, 10);
-         }
-
-         message.acknowledge();
-      }
-
-      Assert.assertNull(consumer1.receiveImmediate());
-
-      session0.close();
-
-      session1.close();
-
-      sf0.close();
-
-      sf1.close();
-      closeFields();
-      assertEquals("there should be no queues", 0, loadQueues(server0).size());
-   }
-
    /**
     * @param server1Params
     */
@@ -1216,7 +1056,6 @@ public class BridgeTest extends ActiveMQTestBase {
       final String propKey = "bridged";
       final String propValue = "true";
 
-
       TransformerConfiguration transformerConfiguration = new TransformerConfiguration(AddHeadersTransformer.class.getName());
       transformerConfiguration.getProperties().put(propKey, propValue);
 
@@ -1278,7 +1117,6 @@ public class BridgeTest extends ActiveMQTestBase {
 
       final int numMessages = 10;
 
-
       for (int i = 0; i < numMessages; i++) {
          ClientMessage message = session0.createMessage(true);