You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/04/03 11:59:20 UTC
[3/3] activemq-artemis git commit: ARTEMIS-1089 Fixing Replication
catchup slow
ARTEMIS-1089 Fixing Replication catchup slow
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7929fff8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7929fff8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7929fff8
Branch: refs/heads/1.x
Commit: 7929fff893e1510390ae8e6a7924aa0e4f3864c0
Parents: b819026
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Apr 2 19:20:42 2017 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Apr 3 12:58:13 2017 +0100
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQBuffer.java | 13 +++
.../core/buffers/impl/ChannelBufferWrapper.java | 5 +
.../impl/ResetLimitWrappedActiveMQBuffer.java | 8 ++
.../CompressedLargeMessageControllerImpl.java | 6 ++
.../client/impl/LargeMessageControllerImpl.java | 15 +++
.../amqp/converter/TestConversions.java | 5 +
.../cursor/impl/PageCursorProviderImpl.java | 1 +
.../core/paging/impl/PagingStoreImpl.java | 31 ++++---
.../impl/journal/JournalStorageManager.java | 2 +-
.../wireformat/ReplicationSyncFileMessage.java | 13 ++-
.../core/replication/ReplicationEndpoint.java | 2 +-
.../core/replication/ReplicationManager.java | 97 ++++++++++++--------
.../core/server/ActiveMQServerLogger.java | 5 +-
13 files changed, 139 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index f30ef35..d753b8f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
*/
void writeBytes(ByteBuffer src);
+
+ /**
+ * Transfers the specified source buffer's data to this buffer starting at
+ * the current {@code writerIndex} until the source buffer's position
+ * reaches its limit, and increases the {@code writerIndex} by the
+ * number of the transferred bytes.
+ *
+ * @param src The source buffer
+ * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+ * {@code this.writableBytes}
+ */
+ void writeBytes(ByteBuf src, int srcIndex, int length);
+
/**
* Returns a copy of this buffer's readable bytes. Modifying the content
* of the returned buffer or this buffer does not affect each other at all.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index c75be21..496c146 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -576,6 +576,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ buffer.writeBytes(src, srcIndex, length);
+ }
+
+ @Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
buffer.writeBytes(src.byteBuf(), srcIndex, length);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index ec6cf09..d6cba00 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -263,6 +263,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
super.writeBytes(src);
}
+
+ @Override
+ public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+ changed();
+
+ super.writeBytes(src, srcIndex, length);
+ }
+
@Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
changed();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+
+ @Override
public ByteBuffer toByteBuffer() {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
+ /**
+ * Transfers the specified source buffer's data to this buffer starting at
+ * the current {@code writerIndex} until the source buffer's position
+ * reaches its limit, and increases the {@code writerIndex} by the
+ * number of the transferred bytes.
+ *
+ * @param src The source buffer
+ * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+ * {@code this.writableBytes}
+ */
+ @Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+ throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+ }
+
public int writeBytes(final InputStream in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index cbe2699..e154cd2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -719,6 +719,11 @@ public class TestConversions extends Assert {
}
@Override
+ public void writeBytes(ByteBuf src, int srcIndex, int length) {
+
+ }
+
+ @Override
public void readFully(byte[] b) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 76ad26b..701f86c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override
public void resumeCleanup() {
this.cleanupEnabled = true;
+ scheduleCleanup();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..8cba9fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1093,28 +1093,29 @@ public class PagingStoreImpl implements PagingStore {
@Override
public Collection<Integer> getCurrentIds() throws Exception {
- List<Integer> ids = new ArrayList<>();
- if (fileFactory != null) {
- for (String fileName : fileFactory.listFiles("page")) {
- ids.add(getPageIdFromFileName(fileName));
+ lock.writeLock().lock();
+ try {
+ List<Integer> ids = new ArrayList<>();
+ if (fileFactory != null) {
+ for (String fileName : fileFactory.listFiles("page")) {
+ ids.add(getPageIdFromFileName(fileName));
+ }
}
+ return ids;
+ } finally {
+ lock.writeLock().unlock();
}
- return ids;
}
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
- lock.writeLock().lock();
- try {
- for (Integer id : pageIds) {
- SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
- if (!sFile.exists()) {
- continue;
- }
- replicator.syncPages(sFile, id, getAddress());
+ for (Integer id : pageIds) {
+ SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
+ if (!sFile.exists()) {
+ continue;
}
- } finally {
- lock.writeLock().unlock();
+ ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
+ replicator.syncPages(sFile, id, getAddress());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 7c0a651..9c122b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
stopReplication();
throw e;
} finally {
- pagingManager.resumeCleanup();
// Re-enable compact and reclaim of journal files
originalBindingsJournal.replicationSyncFinished();
originalMessageJournal.replicationSyncFinished();
+ pagingManager.resumeCleanup();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index de7f73e..90d2ca0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,11 +16,11 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
@@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
*/
private long fileId;
private int dataSize;
- private ByteBuffer byteBuffer;
+ private ByteBuf byteBuffer;
private byte[] byteArray;
private SimpleString pageStoreName;
private FileType fileType;
@@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
SimpleString storeName,
long id,
int size,
- ByteBuffer buffer) {
+ ByteBuf buffer) {
this();
this.byteBuffer = buffer;
this.pageStoreName = storeName;
@@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
* (which might receive appends)
*/
if (dataSize > 0) {
- buffer.writeBytes(byteBuffer);
+ buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+ }
+
+ if (byteBuffer != null) {
+ byteBuffer.release();
+ byteBuffer = null;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1a07adc..e1879da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -410,7 +410,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (!channel1.isOpen()) {
channel1.open();
}
- channel1.writeDirect(ByteBuffer.wrap(data), true);
+ channel1.writeDirect(ByteBuffer.wrap(data), false);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d0468d1..7e0881c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -25,8 +25,11 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -121,6 +124,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private final ExecutorFactory executorFactory;
+ private final Executor replicationStream;
+
private SessionFailureListener failureListener;
private CoreRemotingConnection remotingConnection;
@@ -140,6 +145,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
this.executorFactory = executorFactory;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
+ this.replicationStream = executorFactory.getExecutor();
this.timeout = timeout;
}
@@ -175,7 +181,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
boolean sync,
final boolean lineUp) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
}
}
@@ -340,15 +346,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
private OperationContext sendReplicatePacket(final Packet packet) {
- return sendReplicatePacket(packet, true);
+ return sendReplicatePacket(packet, true, true);
}
- private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
if (!enabled)
return null;
boolean runItNow = false;
- OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
+ final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
if (lineUp) {
repliToken.replicationLineUp();
}
@@ -356,10 +362,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
- if (!flowControl()) {
- return repliToken;
+ if (useExecutor) {
+ replicationStream.execute(() -> {
+ if (enabled) {
+ flowControl();
+ replicatingChannel.send(packet);
+ }
+ });
+ } else {
+ flowControl();
+ replicatingChannel.send(packet);
}
- replicatingChannel.send(packet);
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
@@ -380,33 +393,35 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* In case you refactor this in any way, this method must hold a lock on replication lock. .
*/
private boolean flowControl() {
- // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
- // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
- if (!replicatingChannel.getConnection().isWritable(this)) {
- try {
- logger.trace("flowControl waiting on writable");
- writable.set(false);
- //don't wait for ever as this may hang tests etc, we've probably been closed anyway
- long now = System.currentTimeMillis();
- long deadline = now + timeout;
- while (!writable.get() && now < deadline) {
- replicationLock.wait(deadline - now);
- now = System.currentTimeMillis();
- }
- logger.trace("flow control done");
-
- if (!writable.get()) {
- ActiveMQServerLogger.LOGGER.slowReplicationResponse();
- logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
- try {
- stop();
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
+ synchronized (replicationLock) {
+ // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
+ // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
+ if (!replicatingChannel.getConnection().isWritable(this)) {
+ try {
+ logger.trace("flowControl waiting on writable replication");
+ writable.set(false);
+ //don't wait for ever as this may hang tests etc, we've probably been closed anyway
+ long now = System.currentTimeMillis();
+ long deadline = now + timeout;
+ while (!writable.get() && now < deadline) {
+ replicationLock.wait(deadline - now);
+ now = System.currentTimeMillis();
+ }
+ logger.trace("flow control done on replication");
+
+ if (!writable.get()) {
+ ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+ logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
+ try {
+ stop();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ return false;
}
- return false;
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
}
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
}
}
return true;
@@ -512,7 +527,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
SequentialFile file = jf.getFile().cloneFile();
try {
- ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file);
+ ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
} finally {
if (file.isOpen())
@@ -557,10 +572,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
- final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
+ int size = 1 << 17;
while (true) {
- buffer.clear();
- final int bytesRead = channel.read(buffer);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
+ ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
+ final int bytesRead = channel.read(byteBuffer);
int toSend = bytesRead;
if (bytesRead > 0) {
if (bytesRead >= maxBytesToSend) {
@@ -569,12 +585,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} else {
maxBytesToSend = maxBytesToSend - bytesRead;
}
- buffer.limit(toSend);
}
- buffer.rewind();
-
+ logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
+ // We cannot simply send everything of a file through the executor,
+ // otherwise we would run out of memory.
+ // so we don't use the executor here
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index cf904e1..a25a7f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void backupServerSynched(ActiveMQServerImpl server);
@LogMessage(level = Logger.Level.INFO)
- @Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format = Message.Format.MESSAGE_FORMAT)
- void journalSynch(JournalFile jf, Long size, SequentialFile file);
+ @Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format = Message.Format.MESSAGE_FORMAT)
+ void replicaSyncFile(SequentialFile jf, Long size);
@LogMessage(level = Logger.Level.INFO)
@Message(