You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/24 13:51:17 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1276 delete MessageEvictionTest (feature not implemented)

ARTEMIS-1276 delete MessageEvictionTest (feature not implemented)

The test is broken. It can be fixed by doing

```java
FakeTransportConnector(URI uri) {
   setServer(new TransportServer {
     @Override
     public URI getConnectURI() {
        return uri;
     }
but then the test would fail because message
 eviction is not supported by Artemis.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d732262f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d732262f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d732262f

Branch: refs/heads/master
Commit: d732262f8c59030dffc5e42c598b9adf687305fe
Parents: 7e9c3fd
Author: Jiri Danek <jd...@redhat.com>
Authored: Tue Jul 18 13:55:07 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jul 24 09:50:40 2017 -0400

----------------------------------------------------------------------
 .../apache/activemq/MessageEvictionTest.java    | 290 -------------------
 1 file changed, 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d732262f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
deleted file mode 100644
index e50c588..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
-import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class MessageEvictionTest {
-
-   static final Logger LOG = LoggerFactory.getLogger(MessageEvictionTest.class);
-   private BrokerService broker;
-   private ConnectionFactory connectionFactory;
-   Connection connection;
-   private Session session;
-   private Topic destination;
-   private final String destinationName = "verifyEvection";
-   protected int numMessages = 2000;
-   protected String payload = new String(new byte[1024 * 2]);
-
-   public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception {
-      broker = createBroker(pendingSubscriberPolicy);
-      broker.start();
-      connectionFactory = createConnectionFactory();
-      connection = connectionFactory.createConnection();
-      connection.start();
-      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      destination = session.createTopic(destinationName);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (connection != null) {
-         connection.stop();
-      }
-      if (broker != null) {
-         broker.stop();
-      }
-   }
-
-   @Test
-   public void testMessageEvictionMemoryUsageFileCursor() throws Exception {
-      setUp(new FilePendingSubscriberMessageStoragePolicy());
-      doTestMessageEvictionMemoryUsage();
-   }
-
-   @Test
-   public void testMessageEvictionMemoryUsageVmCursor() throws Exception {
-      setUp(new VMPendingSubscriberMessageStoragePolicy());
-      doTestMessageEvictionMemoryUsage();
-   }
-
-   @Test
-   public void testMessageEvictionDiscardedAdvisory() throws Exception {
-      setUp(new VMPendingSubscriberMessageStoragePolicy());
-
-      ExecutorService executor = Executors.newSingleThreadExecutor();
-      final CountDownLatch consumerRegistered = new CountDownLatch(1);
-      final CountDownLatch gotAdvisory = new CountDownLatch(1);
-      final CountDownLatch advisoryIsGood = new CountDownLatch(1);
-
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               ActiveMQTopic discardedAdvisoryDestination = AdvisorySupport.getMessageDiscardedAdvisoryTopic(destination);
-               // use separate session rather than asyncDispatch on consumer session
-               // as we want consumer session to block
-               Session advisorySession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               final MessageConsumer consumer = advisorySession.createConsumer(discardedAdvisoryDestination);
-               consumer.setMessageListener(new MessageListener() {
-                  int advisoriesReceived = 0;
-
-                  @Override
-                  public void onMessage(Message message) {
-                     try {
-                        LOG.info("advisory:" + message);
-                        ActiveMQMessage activeMQMessage = (ActiveMQMessage) message;
-                        assertNotNull(activeMQMessage.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID));
-                        assertEquals(++advisoriesReceived, activeMQMessage.getIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT));
-                        message.acknowledge();
-                        advisoryIsGood.countDown();
-                     } catch (JMSException e) {
-                        e.printStackTrace();
-                        fail(e.toString());
-                     } finally {
-                        gotAdvisory.countDown();
-                     }
-                  }
-               });
-               consumerRegistered.countDown();
-               gotAdvisory.await(120, TimeUnit.SECONDS);
-               consumer.close();
-               advisorySession.close();
-            } catch (Exception e) {
-               e.printStackTrace();
-               fail(e.toString());
-            }
-         }
-      });
-      assertTrue("we have an advisory consumer", consumerRegistered.await(60, TimeUnit.SECONDS));
-      doTestMessageEvictionMemoryUsage();
-      assertTrue("got an advisory for discarded", gotAdvisory.await(0, TimeUnit.SECONDS));
-      assertTrue("advisory is good", advisoryIsGood.await(0, TimeUnit.SECONDS));
-   }
-
-   public void doTestMessageEvictionMemoryUsage() throws Exception {
-
-      ExecutorService executor = Executors.newCachedThreadPool();
-      final CountDownLatch doAck = new CountDownLatch(1);
-      final CountDownLatch ackDone = new CountDownLatch(1);
-      final CountDownLatch consumerRegistered = new CountDownLatch(1);
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               final MessageConsumer consumer = session.createConsumer(destination);
-               consumer.setMessageListener(new MessageListener() {
-                  @Override
-                  public void onMessage(Message message) {
-                     try {
-                        // very slow, only ack once
-                        doAck.await(60, TimeUnit.SECONDS);
-                        LOG.info("acking: " + message.getJMSMessageID());
-                        message.acknowledge();
-                        ackDone.countDown();
-                     } catch (Exception e) {
-                        e.printStackTrace();
-                        fail(e.toString());
-                     } finally {
-                        consumerRegistered.countDown();
-                        ackDone.countDown();
-                     }
-                  }
-               });
-               consumerRegistered.countDown();
-               ackDone.await(60, TimeUnit.SECONDS);
-               consumer.close();
-            } catch (Exception e) {
-               e.printStackTrace();
-               fail(e.toString());
-            }
-         }
-      });
-
-      assertTrue("we have a consumer", consumerRegistered.await(10, TimeUnit.SECONDS));
-
-      final AtomicInteger sent = new AtomicInteger(0);
-      final CountDownLatch sendDone = new CountDownLatch(1);
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            MessageProducer producer;
-            try {
-               producer = session.createProducer(destination);
-               for (int i = 0; i < numMessages; i++) {
-                  producer.send(session.createTextMessage(payload));
-                  sent.incrementAndGet();
-                  TimeUnit.MILLISECONDS.sleep(10);
-               }
-               producer.close();
-               sendDone.countDown();
-            } catch (Exception e) {
-               sendDone.countDown();
-               e.printStackTrace();
-               fail(e.toString());
-            }
-         }
-      });
-
-      assertTrue("messages sending done", sendDone.await(180, TimeUnit.SECONDS));
-      assertEquals("all message were sent", numMessages, sent.get());
-
-      doAck.countDown();
-      executor.shutdown();
-      executor.awaitTermination(30, TimeUnit.SECONDS);
-
-      assertTrue("usage goes to 0 once consumer goes away", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return 0 == TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage();
-         }
-      }));
-   }
-
-   BrokerService createBroker(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception {
-      BrokerService brokerService = new BrokerService();
-      brokerService.addConnector("tcp://localhost:0");
-      brokerService.setUseJmx(false);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-
-      // spooling to disk early so topic memory limit is not reached
-      brokerService.getSystemUsage().getMemoryUsage().setLimit(500 * 1024);
-
-      final List<PolicyEntry> policyEntries = new ArrayList<>();
-      final PolicyEntry entry = new PolicyEntry();
-      entry.setTopic(">");
-
-      entry.setAdvisoryForDiscardingMessages(true);
-
-      // so consumer does not get over run while blocked limit the prefetch
-      entry.setTopicPrefetch(50);
-
-      entry.setPendingSubscriberPolicy(pendingSubscriberPolicy);
-
-      // limit the number of outstanding messages, large enough to use the file store
-      // or small enough not to blow memory limit
-      int pendingMessageLimit = 50;
-      if (pendingSubscriberPolicy instanceof FilePendingSubscriberMessageStoragePolicy) {
-         pendingMessageLimit = 500;
-      }
-      ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
-      pendingMessageLimitStrategy.setLimit(pendingMessageLimit);
-      entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
-
-      // to keep the limit in check and up to date rather than just the first few, evict some
-      OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
-      // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test
-      //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000);
-      entry.setMessageEvictionStrategy(messageEvictionStrategy);
-
-      // let evicted messaged disappear
-      entry.setDeadLetterStrategy(null);
-      policyEntries.add(entry);
-
-      final PolicyMap policyMap = new PolicyMap();
-      policyMap.setPolicyEntries(policyEntries);
-      brokerService.setDestinationPolicy(policyMap);
-
-      return brokerService;
-   }
-
-   ConnectionFactory createConnectionFactory() throws Exception {
-      String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
-      factory.setWatchTopicAdvisories(false);
-      return factory;
-   }
-
-}