You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/07 03:39:57 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1269 Fixing blocked
replication
Repository: activemq-artemis
Updated Branches:
refs/heads/master f3cc555ab -> 15c2f09bc
ARTEMIS-1269 Fixing blocked replication
If replication blocked anything on the journal
the processing from clients would be blocked
and nothing would work.
As part of this fix I am using an executor on ServerSessionPacketHandler
which will also scale better as the reader from Netty would be feed immediately.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89e84e13
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89e84e13
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89e84e13
Branch: refs/heads/master
Commit: 89e84e1320145c5b81996afa8251fa967f8881ff
Parents: f3cc555
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jul 5 13:28:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 6 19:04:48 2017 -0400
----------------------------------------------------------------------
.../artemis/utils/OrderedExecutorFactory.java | 17 +++++
.../core/journal/impl/FileWrapperJournal.java | 74 ++++++++++++--------
.../core/ServerSessionPacketHandler.java | 42 ++++++++++-
.../core/impl/ActiveMQPacketHandler.java | 2 +-
.../core/replication/ReplicationEndpoint.java | 45 ++++--------
.../core/server/cluster/ClusterController.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 4 +-
.../reattach/MultiThreadRandomReattachTest.java | 3 +
.../NettyMultiThreadRandomReattachTest.java | 3 +
9 files changed, 127 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index 24fa5e7..65cb08f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -33,6 +35,21 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
private final Executor parent;
+
+ public static boolean flushExecutor(Executor executor) {
+ return flushExecutor(executor, 30, TimeUnit.SECONDS);
+ }
+
+ public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ executor.execute(latch::countDown);
+ try {
+ return latch.await(timeout, unit);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
/**
* Construct a new instance delegating to the given parent executor.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 5ef240a..9dafd4b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -96,7 +96,7 @@ public final class FileWrapperJournal extends JournalBase {
IOCompletion callback) throws Exception {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
- writeRecord(addRecord, sync, callback);
+ writeRecord(addRecord, false, -1, false, callback);
}
@Override
@@ -107,7 +107,9 @@ public final class FileWrapperJournal extends JournalBase {
* Write the record to the current file.
*/
private void writeRecord(JournalInternalRecord encoder,
- final boolean sync,
+ final boolean tx,
+ final long txID,
+ final boolean removeTX,
final IOCompletion callback) throws Exception {
lockAppend.lock();
@@ -115,30 +117,54 @@ public final class FileWrapperJournal extends JournalBase {
if (callback != null) {
callback.storeLineUp();
}
- currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
+ testSwitchFiles(encoder);
+ if (txID >= 0) {
+ if (tx) {
+ AtomicInteger value;
+ if (removeTX) {
+ value = transactions.remove(txID);
+ } else {
+ value = transactions.get(txID);
+ }
+ if (value != null) {
+ encoder.setNumberOfRecords(value.get());
+ }
+ } else {
+ count(txID);
+ }
+ }
encoder.setFileID(currentFile.getRecordID());
if (callback != null) {
- currentFile.getFile().write(encoder, sync, callback);
+ currentFile.getFile().write(encoder, false, callback);
} else {
- currentFile.getFile().write(encoder, sync);
+ currentFile.getFile().write(encoder, false);
}
} finally {
lockAppend.unlock();
}
}
+ private void testSwitchFiles(JournalInternalRecord encoder) throws Exception {
+ JournalFile oldFile = currentFile;
+ currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
+ if (oldFile != currentFile) {
+ for (AtomicInteger value : transactions.values()) {
+ value.set(0);
+ }
+ }
+ }
+
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
- writeRecord(deleteRecord, sync, callback);
+ writeRecord(deleteRecord, false, -1, false, callback);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
- count(txID);
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
- writeRecord(deleteRecordTX, false, null);
+ writeRecord(deleteRecordTX, false, txID, false, null);
}
@Override
@@ -147,9 +173,8 @@ public final class FileWrapperJournal extends JournalBase {
byte recordType,
Persister persister,
Object record) throws Exception {
- count(txID);
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
- writeRecord(addRecord, false, null);
+ writeRecord(addRecord, false, txID, false, null);
}
@Override
@@ -160,7 +185,7 @@ public final class FileWrapperJournal extends JournalBase {
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
- writeRecord(updateRecord, sync, callback);
+ writeRecord(updateRecord, false, -1, false, callback);
}
@Override
@@ -169,9 +194,8 @@ public final class FileWrapperJournal extends JournalBase {
byte recordType,
Persister persister,
Object record) throws Exception {
- count(txID);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record);
- writeRecord(updateRecordTX, false, null);
+ writeRecord(updateRecordTX, false, txID, false, null);
}
@Override
@@ -180,12 +204,8 @@ public final class FileWrapperJournal extends JournalBase {
IOCompletion callback,
boolean lineUpContext) throws Exception {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
- AtomicInteger value = transactions.remove(txID);
- if (value != null) {
- commitRecord.setNumberOfRecords(value.get());
- }
- writeRecord(commitRecord, true, callback);
+ writeRecord(commitRecord, true, txID, true, callback);
}
@Override
@@ -194,20 +214,18 @@ public final class FileWrapperJournal extends JournalBase {
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
- AtomicInteger value = transactions.get(txID);
- if (value != null) {
- prepareRecord.setNumberOfRecords(value.get());
- }
- writeRecord(prepareRecord, sync, callback);
+ writeRecord(prepareRecord, true, txID, false, callback);
}
private int count(long txID) throws ActiveMQException {
AtomicInteger defaultValue = new AtomicInteger(1);
AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
if (count != null) {
- return count.incrementAndGet();
+ count.incrementAndGet();
+ } else {
+ count = defaultValue;
}
- return defaultValue.get();
+ return count.intValue();
}
@Override
@@ -218,11 +236,7 @@ public final class FileWrapperJournal extends JournalBase {
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
- AtomicInteger value = transactions.remove(txID);
- if (value != null) {
- rollbackRecord.setNumberOfRecords(value.get());
- }
- writeRecord(rollbackRecord, sync, callback);
+ writeRecord(rollbackRecord, true, txID, true, callback);
}
// UNSUPPORTED STUFF
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 8e3c3ed..54d1d44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
+import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -90,6 +91,9 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.SimpleFuture;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@@ -141,6 +145,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private volatile CoreRemotingConnection remotingConnection;
+ private final Executor callExecutor;
+
private final CoreProtocolManager manager;
// The current currentLargeMessage being processed
@@ -148,7 +154,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final boolean direct;
- public ServerSessionPacketHandler(final CoreProtocolManager manager,
+ public ServerSessionPacketHandler(final Executor callExecutor,
+ final CoreProtocolManager manager,
final ServerSession session,
final StorageManager storageManager,
final Channel channel) {
@@ -166,6 +173,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
Connection conn = remotingConnection.getTransportConnection();
+ this.callExecutor = callExecutor;
+
if (conn instanceof NettyConnection) {
direct = ((NettyConnection) conn).isDirectDeliver();
} else {
@@ -199,11 +208,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
}
+ flushExecutor();
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
}
+ private void flushExecutor() {
+ OrderedExecutorFactory.flushExecutor(callExecutor);
+ }
+
public void close() {
+ flushExecutor();
+
channel.flushConfirmations();
try {
@@ -219,6 +235,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
@Override
public void handlePacket(final Packet packet) {
+ channel.confirm(packet);
+ callExecutor.execute(() -> internalHandlePacket(packet));
+ }
+
+ private void internalHandlePacket(final Packet packet) {
byte type = packet.getType();
storageManager.setContext(session.getSessionContext());
@@ -653,8 +674,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final boolean flush,
final boolean closeChannel) {
if (confirmPacket != null) {
- channel.confirm(confirmPacket);
-
if (flush) {
channel.flushConfirmations();
}
@@ -678,9 +697,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
remotingConnection.removeFailureListener((FailureListener) closeListener);
}
}
+
+ flushExecutor();
}
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
+
+ SimpleFuture<Integer> future = new SimpleFutureImpl<>();
+ callExecutor.execute(() -> {
+ int value = internaltransferConnection(newConnection, lastReceivedCommandID);
+ future.set(value);
+ });
+
+ try {
+ return future.get().intValue();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private int internaltransferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
// delivered
// after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 3cf2cea..765d6fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -169,7 +169,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel);
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler);
// TODO - where is this removed?
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index a68c3f9..6683fbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -27,9 +27,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@@ -81,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.jboss.logging.Logger;
/**
@@ -204,9 +203,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
}
} catch (ActiveMQException e) {
+ logger.warn(e.getMessage(), e);
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(e);
} catch (Exception e) {
+ logger.warn(e.getMessage(), e);
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
}
@@ -278,6 +279,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
return;
}
+ logger.trace("Stopping endpoint");
+
+ started = false;
+
+ OrderedExecutorFactory.flushExecutor(executor);
+
// Channel may be null if there isn't a connection to a live server
if (channel != null) {
channel.close();
@@ -315,15 +322,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
pageManager.stop();
pageIndex.clear();
- final CountDownLatch latch = new CountDownLatch(1);
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- latch.countDown();
- }
- });
- latch.await(30, TimeUnit.SECONDS);
// Storage needs to be the last to stop
storageManager.stop();
@@ -471,28 +469,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
+ ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
+ if (!started)
+ return replicationResponseMessage;
if (packet.isSynchronizationFinished()) {
- executor.execute(() -> {
- try {
- // this is a long running process, we cannot block the reading thread from netty
- finishSynchronization(packet.getNodeID());
- if (logger.isTraceEnabled()) {
- logger.trace("returning completion on synchronization catchup");
- }
- channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
- } catch (Exception e) {
- logger.warn(e.getMessage());
- channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
- }
-
- });
- // the write will happen through an executor
- return null;
- }
-
- ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
- if (!started) {
+ finishSynchronization(packet.getNodeID());
+ replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index e88b14d..b0d0232 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -326,6 +326,9 @@ public class ClusterController implements ActiveMQComponent {
@Override
public void handlePacket(Packet packet) {
if (!isStarted()) {
+ if (channelHandler != null) {
+ channelHandler.handlePacket(packet);
+ }
return;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index ba7bed0..be42ea6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1750,7 +1750,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queue.deleteQueue(removeConsumers);
- if (autoDeleteAddress && postOffice != null && getAddressInfo(address).isAutoCreated()) {
+ AddressInfo addressInfo = getAddressInfo(address);
+
+ if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) {
try {
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
index 31fe17e..20ae956 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
/**
* A MultiThreadRandomReattachTest
@@ -28,6 +30,7 @@ public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTest
protected void start() throws Exception {
Configuration liveConf = createDefaultInVMConfig();
server = createServer(false, liveConf);
+ server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
server.start();
waitForServerToStart(server);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
index 984b69e..fc5d903 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattachTest {
@@ -25,6 +27,7 @@ public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattac
protected void start() throws Exception {
Configuration liveConf = createDefaultNettyConfig();
server = createServer(false, liveConf);
+ server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
server.start();
waitForServerToStart(server);
}
[3/3] activemq-artemis git commit: This closes #1386
Posted by cl...@apache.org.
This closes #1386
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/15c2f09b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/15c2f09b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/15c2f09b
Branch: refs/heads/master
Commit: 15c2f09bc786f0cdd375d473da67502eca39bbe8
Parents: f3cc555 01a5a60
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 6 23:38:02 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 6 23:38:02 2017 -0400
----------------------------------------------------------------------
.../artemis/utils/OrderedExecutorFactory.java | 17 +++++
.../core/journal/impl/FileWrapperJournal.java | 74 ++++++++++++--------
.../core/ServerSessionPacketHandler.java | 42 ++++++++++-
.../core/impl/ActiveMQPacketHandler.java | 2 +-
.../core/replication/ReplicationEndpoint.java | 45 ++++--------
.../core/server/cluster/ClusterController.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 4 +-
.../ExpireWhileLoadBalanceTest.java | 2 +-
.../cluster/failover/FailoverTest.java | 4 +-
.../reattach/MultiThreadRandomReattachTest.java | 3 +
.../NettyMultiThreadRandomReattachTest.java | 3 +
.../ReplicationFlowControlTest.java | 8 +--
.../core/journal/impl/JournalImplTestUnit.java | 4 --
13 files changed, 134 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: NO-JIRA: fixing test
Posted by cl...@apache.org.
NO-JIRA: fixing test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/01a5a60b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/01a5a60b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/01a5a60b
Branch: refs/heads/master
Commit: 01a5a60b373590b75d32544c25b3a1c470f8dbc3
Parents: 89e84e1
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 6 09:18:28 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 6 23:37:01 2017 -0400
----------------------------------------------------------------------
.../cluster/distribution/ExpireWhileLoadBalanceTest.java | 2 +-
.../tests/integration/cluster/failover/FailoverTest.java | 4 ++--
.../smoke/replicationflow/ReplicationFlowControlTest.java | 8 ++++----
.../tests/unit/core/journal/impl/JournalImplTestUnit.java | 4 ----
4 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01a5a60b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java
index 0c921fd..059d776 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java
@@ -76,7 +76,7 @@ public class ExpireWhileLoadBalanceTest extends ClusterTestBase {
for (int i = 0; i <= 2; i++) {
createQueue(i, "queues.testaddress", "queue0", null, true);
getServer(i).createQueue(expiryQueue, RoutingType.ANYCAST, expiryQueue, null, true, false);
- getServer(i).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(i).getAddressSettingsRepository().addMatch("#", as);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01a5a60b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 9b457a7..7912c96 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -370,7 +370,7 @@ public class FailoverTest extends FailoverTestBase {
}
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
- crash(false, session);
+ crash(true, session);
try {
session.commit(xid, false);
@@ -417,7 +417,7 @@ public class FailoverTest extends FailoverTestBase {
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
- crash(false, session);
+ crash(true, session);
try {
session.rollback(xid);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01a5a60b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java
index 85dcd99..084c242 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java
@@ -49,9 +49,9 @@ public class ReplicationFlowControlTest extends SmokeTestBase {
private static Process server1;
- static final int NUM_MESSAGES = 300_000;
- static final int START_CONSUMERS = 100_000;
- static final int START_SERVER = 101_000;
+ static final int NUM_MESSAGES = 50_000;
+ static final int START_CONSUMERS = 10_000;
+ static final int START_SERVER = 15_000;
static final int NUMBER_OF_CONSUMERS = 10;
static final ReusableLatch latch = new ReusableLatch(NUM_MESSAGES);
@@ -88,7 +88,7 @@ public class ReplicationFlowControlTest extends SmokeTestBase {
private void internalTest(boolean failover) throws Exception {
- int KILL_SERVER = failover ? 150_000 : -1;
+ int KILL_SERVER = failover ? 50_000 : -1;
Connection connection = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01a5a60b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index be6eab4..3c50aa1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -597,10 +597,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue(files1.size() > 200);
- Assert.assertEquals(numberOfFiles, files1.size());
-
- System.out.println("we have " + files1.size() + " files now");
-
stopJournal();
}