You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/04/19 16:01:06 UTC

[2/3] activemq-artemis git commit: ARTEMIS-490 Fixing LargeMessage copy through replication

ARTEMIS-490 Fixing LargeMessage copy through replication

this will fix cases like DLQ and Diverts


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e81fa5c3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e81fa5c3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e81fa5c3

Branch: refs/heads/master
Commit: e81fa5c359f8c6e9cd424cd28b2d7ffcb32de922
Parents: dcf6513
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 18 17:55:49 2016 -0400
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Apr 19 15:00:09 2016 +0100

----------------------------------------------------------------------
 .../artemis/core/io/AbstractSequentialFile.java | 10 +--
 .../impl/journal/LargeServerMessageImpl.java    | 78 +++++++++++---------
 .../artemis/core/server/ServerMessage.java      |  2 -
 .../core/server/cluster/impl/Redistributor.java |  2 -
 .../artemis/core/server/impl/DivertImpl.java    |  1 -
 .../artemis/core/server/impl/QueueImpl.java     |  1 -
 .../core/server/impl/ServerMessageImpl.java     |  5 --
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 --
 8 files changed, 47 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 4314267..487d8a5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
+import org.apache.activemq.artemis.core.io.util.FileIOUtil;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@@ -113,14 +114,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
 
          ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
 
-         for (;;) {
-            buffer.rewind();
-            int size = this.read(buffer);
-            newFileName.writeDirect(buffer, false);
-            if (size < 10 * 1024) {
-               break;
-            }
-         }
+         FileIOUtil.copyData(this, newFileName, buffer);
          newFileName.close();
          this.close();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index c24924a..3f6f4d7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -264,51 +264,63 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
    }
 
    @Override
-   public synchronized ServerMessage copy() {
+   public ServerMessage copy() {
       SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
 
       ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
       return newMessage;
    }
 
-   public void copyFrom(final SequentialFile fileSource) throws Exception {
-      this.bodySize = -1;
-      this.pendingCopy = fileSource;
-   }
-
    @Override
-   public void finishCopy() throws Exception {
-      if (pendingCopy != null) {
-         SequentialFile copyTo = createFile();
-         try {
-            this.pendingRecordID = storageManager.storePendingLargeMessage(this.messageID);
-            copyTo.open();
-            pendingCopy.open();
-            pendingCopy.copyTo(copyTo);
-         }
-         finally {
-            copyTo.close();
-            pendingCopy.close();
-            pendingCopy = null;
+   public ServerMessage copy(final long newID) {
+      try {
+         LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
+
+
+         byte[] bufferBytes = new byte[100 * 1024];
+
+         ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+
+         long oldPosition = file.position();
+
+         boolean originallyOpen = file.isOpen();
+         file.open();
+         file.position(0);
+
+         for (;;) {
+            // The buffer is reused...
+            // We need to make sure we clear the limits and the buffer before reusing it
+            buffer.clear();
+            int bytesRead = file.read(buffer);
+
+            byte[] bufferToWrite;
+            if (bytesRead == 0) {
+               break;
+            }
+            else if (bytesRead == bufferBytes.length) {
+               bufferToWrite = bufferBytes;
+            }
+            else {
+               bufferToWrite = new byte[bytesRead];
+               System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
+            }
+
+            newMessage.addBytes(bufferToWrite);
+
+            if (bytesRead < bufferBytes.length) {
+               break;
+            }
          }
 
-         closeFile();
-         bodySize = -1;
-         file = null;
-      }
-   }
+         file.position(oldPosition);
 
-   /**
-    * The copy of the file itself will be done later by {@link LargeServerMessageImpl#finishCopy()}
-    */
-   @Override
-   public synchronized ServerMessage copy(final long newID) {
-      try {
-         SequentialFile newfile = storageManager.createFileForLargeMessage(newID, durable);
+         if (!originallyOpen) {
+            file.close();
+         }
 
-         LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newfile, newID);
-         newMessage.copyFrom(createFile());
          return newMessage;
+
+
       }
       catch (Exception e) {
          ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
index 73a4df5..40dc50f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java
@@ -48,8 +48,6 @@ public interface ServerMessage extends MessageInternal, EncodingSupport {
 
    ServerMessage copy(long newID);
 
-   void finishCopy() throws Exception;
-
    ServerMessage copy();
 
    int getMemoryEstimate();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 7d24dc6..339293b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -148,7 +148,6 @@ public class Redistributor implements Consumer {
       }
 
       if (!reference.getMessage().isLargeMessage()) {
-         routingInfo.getB().finishCopy();
 
          postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
 
@@ -160,7 +159,6 @@ public class Redistributor implements Consumer {
             @Override
             public void run() {
                try {
-                  routingInfo.getB().finishCopy();
 
                   postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index c2f8b90..b90db75 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -88,7 +88,6 @@ public class DivertImpl implements Divert {
       // Shouldn't copy if it's not routed anywhere else
       if (!forwardAddress.equals(message.getAddress())) {
          copy = message.copy(id);
-         copy.finishCopy();
 
          // This will set the original MessageId, and the original address
          copy.setOriginalHeaders(message, null, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 449704b..6f91a0a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2222,7 +2222,6 @@ public class QueueImpl implements Queue {
          }
       }
 
-      copyMessage.finishCopy();
       postOffice.processRoute(copyMessage, routingContext, false);
 
       ref.handled();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
index e7a7e67..5e5aebe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
@@ -186,10 +186,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
    }
 
    @Override
-   public void finishCopy() throws Exception {
-   }
-
-   @Override
    public ServerMessage copy() {
       // This is a simple copy, used only to avoid changing original properties
       return new ServerMessageImpl(this);
@@ -216,7 +212,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
       */
 
       ServerMessage copy = copy(newID);
-      copy.finishCopy();
 
       if (copyOriginalHeaders) {
          copy.setOriginalHeaders(this, originalReference, expiry);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81fa5c3/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 53edb79..04a587d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -337,11 +337,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void finishCopy() throws Exception {
-
-      }
-
-      @Override
       public ServerMessage copy() {
          return null;
       }