You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/07/15 22:52:24 UTC
[activemq-artemis] branch master updated: ARTEMIS-2336 Use zero
copy to replicate journal/page/large message file
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 85b93f0 ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
new 1eaf12b This closes #2666
85b93f0 is described below
commit 85b93f0883bc06a2dfe2de9d560805a59d626d38
Author: yang wei <wy...@gmail.com>
AuthorDate: Mon May 13 12:32:58 2019 +0800
ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
---
.../artemis/core/protocol/core/Channel.java | 20 +++
.../core/protocol/core/impl/ChannelImpl.java | 118 +++++++++-----
.../core/protocol/core/impl/PacketImpl.java | 11 +-
.../core/remoting/impl/netty/NettyConnection.java | 41 +++++
.../impl/netty/NonClosingDefaultFileRegion.java | 38 +++++
.../artemis/spi/core/remoting/Connection.java | 4 +
.../core/protocol/core/impl/ChannelImplTest.java | 11 ++
.../wireformat/ReplicationSyncFileMessage.java | 170 ++++++++++++---------
.../core/remoting/impl/invm/InVMConnection.java | 24 +++
.../core/replication/ReplicationManager.java | 105 ++++++++-----
.../wireformat/ReplicationSyncFileMessageTest.java | 85 +++++++++++
.../integration/cluster/util/BackupSyncDelay.java | 7 +
.../remoting/impl/netty/NettyConnectionTest.java | 28 ++++
13 files changed, 513 insertions(+), 149 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 56f8259..e541dad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -67,6 +69,20 @@ public interface Channel {
boolean send(Packet packet);
/**
+ * Sends a packet and file on this channel.
+ *
+ * @param packet the packet to send
+ * @param raf the file to send
+ * @param fileChannel the file channel retrieved from raf
+ * @param offset the position of the raf
+ * @param dataSize the data size to send
+ * @param callback callback after send
+ * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+ * successful
+ */
+ boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback);
+
+ /**
* Sends a packet on this channel.
*
* @param packet the packet to send
@@ -247,4 +263,8 @@ public interface Channel {
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);
+
+ interface Callback {
+ void done(boolean success);
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index fe876ed..9f36d81 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -25,6 +27,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -274,67 +277,104 @@ public final class ChannelImpl implements Channel {
}
}
- // This must never called by more than one thread concurrently
- private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
- if (invokeInterceptors(packet, interceptors, connection) != null) {
- return false;
+ private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
+ packet.setChannelID(id);
+
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
- synchronized (sendLock) {
- packet.setChannelID(id);
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
+ }
- if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
- packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
+ ActiveMQBuffer buffer = packet.encode(connection);
+
+ lock.lock();
+
+ try {
+ if (failingOver) {
+ waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
- if (logger.isTraceEnabled()) {
- logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
+ // Sanity check
+ if (transferring) {
+ throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}
- ActiveMQBuffer buffer = packet.encode(connection);
+ if (resendCache != null && packet.isRequiresConfirmations()) {
+ addResendPacket(packet);
+ }
- lock.lock();
+ } finally {
+ lock.unlock();
+ }
- try {
- if (failingOver) {
- waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
+ }
- // Sanity check
- if (transferring) {
- throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
- }
+ checkReconnectID(reconnectID);
- if (resendCache != null && packet.isRequiresConfirmations()) {
- addResendPacket(packet);
+ //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
+ //As the send could block if the response cache cannot add, preventing responses to be handled.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ while (!responseAsyncCache.add(packet)) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ // Ignore
}
-
- } finally {
- lock.unlock();
}
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
- }
+ return buffer;
+ }
- checkReconnectID(reconnectID);
+ // This must never called by more than one thread concurrently
+ private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
+ if (invokeInterceptors(packet, interceptors, connection) != null) {
+ return false;
+ }
- //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
- //As the send could block if the response cache cannot add, preventing responses to be handled.
- if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
- while (!responseAsyncCache.add(packet)) {
- try {
- Thread.sleep(1);
- } catch (Exception e) {
- // Ignore
- }
+ synchronized (sendLock) {
+ ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
+
+ // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+ // buffer is full, preventing any incoming buffers being handled and blocking failover
+ try {
+ connection.getTransportConnection().write(buffer, flush, batch);
+ } catch (Throwable t) {
+ //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
+ //The client would get still know about this as the exception bubbles up the call stack instead.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ responseAsyncCache.remove(packet.getCorrelationID());
}
+ throw t;
}
+ return true;
+ }
+ }
+
+ @Override
+ public boolean send(Packet packet,
+ RandomAccessFile raf,
+ FileChannel fileChannel,
+ long offset,
+ int dataSize,
+ Callback callback) {
+ if (invokeInterceptors(packet, interceptors, connection) != null) {
+ return false;
+ }
+
+ synchronized (sendLock) {
+ ActiveMQBuffer buffer = beforeSend(packet, -1);
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
- connection.getTransportConnection().write(buffer, flush, batch);
+ connection.getTransportConnection().write(buffer);
+ connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index f8f85e8..a7a3253 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -336,7 +336,11 @@ public class PacketImpl implements Packet {
}
protected void encodeSize(ActiveMQBuffer buffer) {
- size = buffer.writerIndex();
+ encodeSize(buffer, buffer.writerIndex());
+ }
+
+ protected void encodeSize(ActiveMQBuffer buffer, int size) {
+ this.size = size;
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
@@ -345,9 +349,10 @@ public class PacketImpl implements Packet {
}
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
+ return createPacket(connection, expectedEncodeSize());
+ }
- int size = expectedEncodeSize();
-
+ protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) {
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 51330c7..497448e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.SocketAddress;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -29,6 +32,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -350,6 +355,18 @@ public class NettyConnection implements Connection {
return canWrite;
}
+ private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) {
+ if (channel.pipeline().get(SslHandler.class) == null) {
+ return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+ } else {
+ try {
+ return new ChunkedFile(raf, offset, dataSize, 8192);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
@@ -390,6 +407,30 @@ public class NettyConnection implements Connection {
}
}
+ @Override
+ public void write(RandomAccessFile raf,
+ FileChannel fileChannel,
+ long offset,
+ int dataSize,
+ final ChannelFutureListener futureListener) {
+ final int readableBytes = dataSize;
+ if (logger.isDebugEnabled()) {
+ final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
+ if (remainingBytes < 0) {
+ logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
+ }
+ }
+
+ //no need to lock because the Netty's channel is thread-safe
+ //and the order of write is ensured by the order of the write calls
+ final Channel channel = this.channel;
+ assert readableBytes >= 0;
+ ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize));
+ if (futureListener != null) {
+ channelFuture.addListener(futureListener);
+ }
+ }
+
private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
if (!channel.eventLoop().inEventLoop()) {
waitFor(promise, DEFAULT_WAIT_MILLIS);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
new file mode 100644
index 0000000..4fc367f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import io.netty.channel.DefaultFileRegion;
+
+public class NonClosingDefaultFileRegion extends DefaultFileRegion {
+
+ public NonClosingDefaultFileRegion(FileChannel file, long position, long count) {
+ super(file, position, count);
+ }
+
+ public NonClosingDefaultFileRegion(File f, long position, long count) {
+ super(f, position, count);
+ }
+
+ @Override
+ protected void deallocate() {
+ // Overridden to avoid closing the file
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ebde456..fe5d395 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelFutureListener;
@@ -101,6 +103,8 @@ public interface Connection {
*/
void write(ActiveMQBuffer buffer);
+ void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener);
+
/**
* This should close the internal channel without calling any listeners.
* This is to avoid a situation where the broker is busy writing on an internal thread.
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 9908d97..e9181f8 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.security.auth.Subject;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -387,6 +389,15 @@ public class ChannelImplTest {
}
@Override
+ public void write(RandomAccessFile raf,
+ FileChannel fileChannel,
+ long offset,
+ int dataSize,
+ ChannelFutureListener channelFutureListener) {
+
+ }
+
+ @Override
public void forceClose() {
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index b81782b..5a30c64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,22 +16,30 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.Objects;
import java.util.Set;
-import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.jboss.logging.Logger;
/**
* Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
* which extra information is sent.
*/
public final class ReplicationSyncFileMessage extends PacketImpl {
+ private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class);
/**
* The JournalType or {@code null} if sync'ing large-messages.
@@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
*/
private long fileId;
private int dataSize;
- private ByteBuf byteBuffer;
private byte[] byteArray;
private SimpleString pageStoreName;
private FileType fileType;
+ private RandomAccessFile raf;
+ private FileChannel fileChannel;
+ private long offset;
public enum FileType {
JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
@@ -78,14 +88,18 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
SimpleString storeName,
long id,
- int size,
- ByteBuf buffer) {
+ RandomAccessFile raf,
+ FileChannel fileChannel,
+ long offset,
+ int size) {
this();
- this.byteBuffer = buffer;
this.pageStoreName = storeName;
this.dataSize = size;
this.fileId = id;
+ this.raf = raf;
+ this.fileChannel = fileChannel;
this.journalType = content;
+ this.offset = offset;
determineType();
}
@@ -99,10 +113,30 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
}
+ public long getFileId() {
+ return fileId;
+ }
+
+ public int getDataSize() {
+ return dataSize;
+ }
+
+ public RandomAccessFile getRaf() {
+ return raf;
+ }
+
+ public FileChannel getFileChannel() {
+ return fileChannel;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
@Override
public int expectedEncodeSize() {
int size = PACKET_HEADERS_SIZE +
- DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+ DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
if (fileId == -1)
return size;
@@ -125,7 +159,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
if (dataSize > 0) {
- size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+ size += dataSize;
}
return size;
@@ -150,30 +184,55 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
default:
// no-op
}
-
buffer.writeInt(dataSize);
- /*
- * sending -1 will close the file in case of a journal, but not in case of a largeMessage
- * (which might receive appends)
- */
- if (dataSize > 0) {
- buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
- }
+ }
- release();
+ @Override
+ public ActiveMQBuffer encode(CoreRemotingConnection connection) {
+ if (fileId != -1 && dataSize > 0) {
+ ActiveMQBuffer buffer;
+ int bufferSize = expectedEncodeSize();
+ int encodedSize = bufferSize;
+ boolean isNetty = false;
+ if (connection != null && connection.getTransportConnection() instanceof NettyConnection) {
+ bufferSize -= dataSize;
+ isNetty = true;
+ }
+ buffer = createPacket(connection, bufferSize);
+ encodeHeader(buffer);
+ encodeRest(buffer, connection);
+ if (!isNetty) {
+ ByteBuffer byteBuffer;
+ if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) {
+ byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
+ } else {
+ byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes());
+ }
+ readFile(byteBuffer);
+ buffer.writerIndex(buffer.capacity());
+ }
+ encodeSize(buffer, encodedSize);
+ return buffer;
+ } else {
+ return super.encode(connection);
+ }
}
@Override
public void release() {
- if (byteBuffer != null) {
- byteBuffer.release();
- byteBuffer = null;
+ if (raf != null) {
+ try {
+ raf.close();
+ } catch (IOException e) {
+ logger.error("Close file " + this + " failed", e);
+ }
}
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
fileId = buffer.readLong();
+ if (fileId == -1) return;
switch (FileType.getFileType(buffer.readByte())) {
case JOURNAL: {
journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
@@ -197,6 +256,14 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
}
+ private void readFile(ByteBuffer buffer) {
+ try {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public long getId() {
return fileId;
}
@@ -218,61 +285,22 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + Arrays.hashCode(byteArray);
- result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode());
- result = prime * result + dataSize;
- result = prime * result + (int) (fileId ^ (fileId >>> 32));
- result = prime * result + ((fileType == null) ? 0 : fileType.hashCode());
- result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
- result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public boolean equals(Object o) {
+ if (this == o)
return true;
- }
- if (!super.equals(obj)) {
- return false;
- }
- if (!(obj instanceof ReplicationSyncFileMessage)) {
- return false;
- }
- ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj;
- if (!Arrays.equals(byteArray, other.byteArray)) {
- return false;
- }
- if (byteBuffer == null) {
- if (other.byteBuffer != null) {
- return false;
- }
- } else if (!byteBuffer.equals(other.byteBuffer)) {
+ if (o == null || getClass() != o.getClass())
return false;
- }
- if (dataSize != other.dataSize) {
- return false;
- }
- if (fileId != other.fileId) {
+ if (!super.equals(o))
return false;
- }
- if (fileType != other.fileType) {
- return false;
- }
- if (journalType != other.journalType) {
- return false;
- }
- if (pageStoreName == null) {
- if (other.pageStoreName != null) {
- return false;
- }
- } else if (!pageStoreName.equals(other.pageStoreName)) {
- return false;
- }
- return true;
+ ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o;
+ return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset);
+ result = 31 * result + Arrays.hashCode(byteArray);
+ return result;
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index b2fc576..02f1c84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.invm;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -243,6 +245,28 @@ public class InVMConnection implements Connection {
}
@Override
+ public void write(RandomAccessFile raf,
+ FileChannel fileChannel,
+ long offset,
+ int dataSize,
+ final ChannelFutureListener futureListener) {
+ if (futureListener == null) {
+ return;
+ }
+ try {
+ executor.execute(() -> {
+ try {
+ futureListener.operationComplete(null);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+
+ }
+ }
+
+ @Override
public String getRemoteAddress() {
return "invm:" + serverID;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 1d1217d..d48a5a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -16,8 +16,7 @@
*/
package org.apache.activemq.artemis.core.replication;
-import java.io.FileInputStream;
-import java.nio.ByteBuffer;
+import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.LinkedHashSet;
@@ -28,8 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -392,6 +389,39 @@ public final class ReplicationManager implements ActiveMQComponent {
return repliToken;
}
+ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
+ if (!enabled) {
+ syncFileMessage.release();
+ return null;
+ }
+
+ final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
+ repliToken.replicationLineUp();
+
+ replicationStream.execute(() -> {
+ if (enabled) {
+ try {
+ pendingTokens.add(repliToken);
+ flowControl(syncFileMessage.expectedEncodeSize());
+ if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) {
+ replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(),
+ syncFileMessage.getOffset(), syncFileMessage.getDataSize(),
+ lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null);
+ } else {
+ replicatingChannel.send(syncFileMessage);
+ }
+ } catch (Exception e) {
+ syncFileMessage.release();
+ }
+ } else {
+ syncFileMessage.release();
+ repliToken.replicationDone();
+ }
+ });
+
+ return repliToken;
+ }
+
/**
* This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on replication lock. .
@@ -560,49 +590,52 @@ public final class ReplicationManager implements ActiveMQComponent {
if (!file.isOpen()) {
file.open();
}
- int size = 32 * 1024;
+ final int size = 1024 * 1024;
+ long fileSize = file.size();
int flowControlSize = 10;
int packetsSent = 0;
FlushAction action = new FlushAction();
+ long offset = 0;
+ RandomAccessFile raf = null;
+ FileChannel fileChannel = null;
try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
-
- // We can afford having a single buffer here for this entire loop
- // because sendReplicatePacket will encode the packet as a NettyBuffer
- // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
- while (true) {
- final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
- buffer.clear();
- ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
- final int bytesRead = channel.read(byteBuffer);
- int toSend = bytesRead;
- if (bytesRead > 0) {
- if (bytesRead >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
- } else {
- maxBytesToSend = maxBytesToSend - bytesRead;
- }
- }
- logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
- // sending -1 or 0 bytes will close the file at the backup
- // We cannot simply send everything of a file through the executor,
- // otherwise we would run out of memory.
- // so we don't use the executor here
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
- packetsSent++;
-
- if (packetsSent % flowControlSize == 0) {
- flushReplicationStream(action);
+ raf = new RandomAccessFile(file.getJavaFile(), "r");
+ fileChannel = raf.getChannel();
+ while (true) {
+ long chunkSize = Math.min(size, fileSize - offset);
+ int toSend = (int) chunkSize;
+ if (chunkSize > 0) {
+ if (chunkSize >= maxBytesToSend) {
+ toSend = (int) maxBytesToSend;
+ maxBytesToSend = 0;
+ } else {
+ maxBytesToSend = maxBytesToSend - chunkSize;
}
- if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
- break;
}
+ logger.debug("sending " + toSend + " bytes on file " + file.getFileName());
+ // sending -1 or 0 bytes will close the file at the backup
+ // We cannot simply send everything of a file through the executor,
+ // otherwise we would run out of memory.
+ // so we don't use the executor here
+ sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
+ packetsSent++;
+ offset += toSend;
+
+ if (packetsSent % flowControlSize == 0) {
+ flushReplicationStream(action);
+ }
+ if (toSend == 0 || maxBytesToSend == 0)
+ break;
}
flushReplicationStream(action);
+
+ } catch (Exception e) {
+ if (raf != null)
+ raf.close();
+ throw e;
} finally {
if (file.isOpen())
file.close();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
new file mode 100644
index 0000000..812de2c
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES;
+
+public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
+ @Test
+ public void testNettyConnectionEncodeMessage() throws Exception {
+ int dataSize = 10;
+ NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false);
+
+ SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+ SequentialFile file = factory.createSequentialFile("file1.bin");
+ file.open();
+ RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+ FileChannel fileChannel = raf.getChannel();
+ ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
+ null, 10, raf, fileChannel, 0, dataSize);
+ RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
+ ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
+ Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
+ Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize);
+ file.close();
+ }
+
+
+ @Test
+ public void testInVMConnectionEncodeMessage() throws Exception {
+ int fileId = 10;
+ InVMConnection conn = new InVMConnection(0, null, null, null);
+
+ SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+ SequentialFile file = factory.createSequentialFile("file1.bin");
+ file.open();
+ RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+ FileChannel fileChannel = raf.getChannel();
+ ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
+ null, fileId, raf, fileChannel, 0, 0);
+ RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
+ ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
+ Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
+ Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize());
+
+ Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE);
+
+ ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage();
+ decodedReplicationSyncFileMessage.decode(buffer);
+ Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES);
+ Assert.assertNull(decodedReplicationSyncFileMessage.getData());
+ file.close();
+ }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c7ed869..c55764a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.util;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -212,6 +214,11 @@ public class BackupSyncDelay implements Interceptor {
}
@Override
+ public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) {
+ return true;
+ }
+
+ @Override
public boolean sendBatched(Packet packet) {
throw new UnsupportedOperationException();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index c9c975c..23ae5f9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -77,6 +82,29 @@ public class NettyConnectionTest extends ActiveMQTestBase {
}
+ @Test
+ public void testWritePacketAndFile() throws Exception {
+ EmbeddedChannel channel = createChannel();
+ NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false);
+
+ final int size = 1234;
+
+ ActiveMQBuffer buff = conn.createTransportBuffer(size);
+ buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
+ SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+ SequentialFile file = factory.createSequentialFile("file1.bin");
+ file.open();
+ RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+ FileChannel fileChannel = raf.getChannel();
+
+ conn.write(buff);
+ conn.write(raf, fileChannel, 0, size, future -> raf.close());
+ channel.runPendingTasks();
+ Assert.assertEquals(2, channel.outboundMessages().size());
+ Assert.assertFalse(fileChannel.isOpen());
+ file.close();
+ }
+
@Test(expected = IllegalStateException.class)
public void throwsExceptionOnBlockUntilWritableIfClosed() {
EmbeddedChannel channel = createChannel();