You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by clebertsuconic <gi...@git.apache.org> on 2018/01/18 17:45:20 UTC
[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1742#discussion_r162418130
--- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java ---
@@ -0,0 +1,388 @@
+package org.apache.activemq.artemis.tests.integration.replication;
+
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.*;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.junit.Wait;
+import org.jboss.logging.Logger;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SharedNothingReplicationTest {
+ private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
+
+ @Rule
+ public TemporaryFolder brokersFolder = new TemporaryFolder();
+
+ private SlowMessagePersister slowMessagePersister;
+
+ @Before
+ public void setUp() throws Exception {
+ slowMessagePersister = new SlowMessagePersister();
+ CoreMessagePersister.theInstance = slowMessagePersister;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (slowMessagePersister != null) {
+ CoreMessagePersister.theInstance = slowMessagePersister.persister;
+ }
+ }
+
+ @Test
+ public void testReplicateFromSlowLive() throws Exception {
+ // start live
+ Configuration liveConfiguration = createLiveConfiguration();
+ ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
+ liveServer.start();
+
+ Wait.waitFor(() -> liveServer.isStarted());
+
+ CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
+
+ final CountDownLatch replicated = new CountDownLatch(1);
+
+ ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
+ locator.setCallTimeout(60_000L);
+ locator.setConnectionTTL(60_000L);
+ locator.addClusterTopologyListener(new ClusterTopologyListener() {
+ @Override
+ public void nodeUP(TopologyMember member, boolean last) {
+ logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup());
+ if (member.getBackup() != null) {
+ replicated.countDown();
+ }
+ }
+
+ @Override
+ public void nodeDown(long eventUID, String nodeID) {
+
+ }
+ });
+
+ final ClientSessionFactory csf = locator.createSessionFactory();
+ ClientSession sess = csf.createSession();
+ sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
+ sess.close();
+ Executor sendMessageExecutor = Executors.newCachedThreadPool();
+
+
+ // let's write some messages
+ int i = 0;
+ final int j = 50;
+ final CountDownLatch allMessageSent = new CountDownLatch(j);
+ while (i < 5) {
+ sendMessageExecutor.execute(() -> {
+ try {
+ ClientSession session = csf.createSession(true, true);
+ ClientProducer producer = session.createProducer("slow");
+ ClientMessage message = session.createMessage(true);
+ // this will make journal's append executor busy
+ message.putLongProperty("delay", Long.getLong("message.property.delay",500L));
--- End diff --
Where? how this property is affecting the semantic of the server. I didn't find it anywhere.
---