You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by clebertsuconic <gi...@git.apache.org> on 2018/01/18 17:45:20 UTC

[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1742#discussion_r162418130
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java ---
    @@ -0,0 +1,388 @@
    +package org.apache.activemq.artemis.tests.integration.replication;
    +
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
    +import org.apache.activemq.artemis.api.core.ActiveMQException;
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.client.*;
    +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
    +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
    +import org.apache.activemq.artemis.core.config.Configuration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
    +import org.apache.activemq.artemis.core.io.SequentialFileFactory;
    +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
    +import org.apache.activemq.artemis.core.journal.LoaderCallback;
    +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
    +import org.apache.activemq.artemis.core.journal.RecordInfo;
    +import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessage;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
    +import org.apache.activemq.artemis.core.persistence.Persister;
    +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
    +import org.apache.activemq.artemis.core.server.ActiveMQServer;
    +import org.apache.activemq.artemis.core.server.ActiveMQServers;
    +import org.apache.activemq.artemis.core.server.JournalType;
    +import org.apache.activemq.artemis.junit.Wait;
    +import org.jboss.logging.Logger;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class SharedNothingReplicationTest {
    +    private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
    +
    +    @Rule
    +    public TemporaryFolder brokersFolder = new TemporaryFolder();
    +
    +    private SlowMessagePersister slowMessagePersister;
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        slowMessagePersister = new SlowMessagePersister();
    +        CoreMessagePersister.theInstance = slowMessagePersister;
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        if (slowMessagePersister != null) {
    +            CoreMessagePersister.theInstance = slowMessagePersister.persister;
    +        }
    +    }
    +
    +    @Test
    +    public void testReplicateFromSlowLive() throws Exception {
    +        // start live
    +        Configuration liveConfiguration = createLiveConfiguration();
    +        ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
    +        liveServer.start();
    +
    +        Wait.waitFor(() -> liveServer.isStarted());
    +
    +        CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
    +
    +        final CountDownLatch replicated = new CountDownLatch(1);
    +
    +        ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
    +        locator.setCallTimeout(60_000L);
    +        locator.setConnectionTTL(60_000L);
    +        locator.addClusterTopologyListener(new ClusterTopologyListener() {
    +            @Override
    +            public void nodeUP(TopologyMember member, boolean last) {
    +                logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup());
    +                if (member.getBackup() != null) {
    +                    replicated.countDown();
    +                }
    +            }
    +
    +            @Override
    +            public void nodeDown(long eventUID, String nodeID) {
    +
    +            }
    +        });
    +
    +        final ClientSessionFactory csf = locator.createSessionFactory();
    +        ClientSession sess = csf.createSession();
    +        sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
    +        sess.close();
    +        Executor sendMessageExecutor = Executors.newCachedThreadPool();
    +
    +
    +        // let's write some messages
    +        int i = 0;
    +        final int j = 50;
    +        final CountDownLatch allMessageSent = new CountDownLatch(j);
    +        while (i < 5) {
    +            sendMessageExecutor.execute(() -> {
    +                try {
    +                    ClientSession session = csf.createSession(true, true);
    +                    ClientProducer producer = session.createProducer("slow");
    +                    ClientMessage message = session.createMessage(true);
    +                    // this will make journal's append executor busy
    +                    message.putLongProperty("delay", Long.getLong("message.property.delay",500L));
    --- End diff --
    
    Where? how this property is affecting the semantic of the server. I didn't find it anywhere.


---