You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/26 18:48:49 UTC

[01/17] activemq-artemis git commit: [ARTMIS-1431] Adapt transport configuration in ClientProtocolManagerFactory

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x c85575f43 -> b56950c35


[ARTMIS-1431] Adapt transport configuration in ClientProtocolManagerFactory

add the adaptTransportConfiguration() method to the
ClientProtocolManagerFactory so that transport configurations used by
the ClientProtocolManager have an opportunity to adapt their transport
configuration.

This allows the HornetQClientProtocolManagerFactory to adapt the
transport configuration received by remote HornetQ broker to replace the
HornetQ-based NettyConnectorFactory by the Artemis-based one.

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6febd5e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6febd5e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6febd5e3

Branch: refs/heads/1.x
Commit: 6febd5e31492d5d23975fbde852d1e65573523d7
Parents: c85575f
Author: Jeff Mesnil <jm...@gmail.com>
Authored: Fri Sep 22 16:35:36 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:04 2017 -0400

----------------------------------------------------------------------
 .../service/extensions/xa/recovery/XARecoveryConfig.java       | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6febd5e3/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
index 6336824..292395a 100644
--- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
+++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
@@ -65,7 +65,11 @@ public class XARecoveryConfig {
                            final ClientProtocolManagerFactory clientProtocolManager) {
       TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
       for (int i = 0; i < transportConfiguration.length; i++) {
-         newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
+         if (clientProtocolManager != null) {
+            newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
+         } else {
+            newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
+         }
       }
 
       this.transportConfiguration = newTransportConfiguration;


[06/17] activemq-artemis git commit: NO JIRA Remove failing tests due to cherrypick

Posted by cl...@apache.org.
NO JIRA Remove failing tests due to cherrypick


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5db0c877
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5db0c877
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5db0c877

Branch: refs/heads/1.x
Commit: 5db0c8772ebbbac955cd0a9cf17262aeccd3aeb8
Parents: fef0256
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Jul 26 15:35:15 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:06 2017 -0400

----------------------------------------------------------------------
 .../failover/FailoverTestWithDivert.java        | 47 ++------------------
 1 file changed, 3 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5db0c877/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
index 76efc22..5b42c3c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -26,18 +28,10 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
-import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
-import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class FailoverTestWithDivert extends FailoverTestBase {
 
    private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
@@ -60,41 +54,6 @@ public class FailoverTestWithDivert extends FailoverTestBase {
       return getNettyConnectorTransportConfiguration(live);
    }
 
-   @Override
-   protected void createConfigs() throws Exception {
-      createReplicatedConfigs();
-
-      liveConfig.setJournalFileSize(10240000);
-      backupConfig.setJournalFileSize(10240000);
-      addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
-      addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
-      addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
-      addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
-   }
-
-   private void addQueue(Configuration serverConfig, String address, String name) {
-
-      List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
-      CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
-      addrCfg.setName(address);
-      addrCfg.addRoutingType(RoutingType.ANYCAST);
-      CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
-      qConfig.setName(name);
-      qConfig.setAddress(address);
-      addrCfg.addQueueConfiguration(qConfig);
-      addrConfigs.add(addrCfg);
-   }
-
-   private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) {
-      List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
-      DivertConfiguration newDivert = new DivertConfiguration();
-      newDivert.setName("myDivert");
-      newDivert.setAddress(source);
-      newDivert.setForwardingAddress(target);
-      newDivert.setExclusive(exclusive);
-      divertConfigs.add(newDivert);
-   }
-
    @Test
    public void testUniqueIDsWithDivert() throws Exception {
       Map<String, Object> params = new HashMap<>();


[04/17] activemq-artemis git commit: ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

Posted by cl...@apache.org.
ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

When a large message is replicated to backup, a pendingID is generated
when the large message is finished. This pendingID is generated by a
BatchingIDGenerator at backup.

It is possible that a pendingID generated at backup may be a duplicate
to an ID generated at live server.

This can cause a problem when a large message with a messageID that is
the same as another largemessage's pendingID is replicated and stored
in the backup's journal, and then a deleteRecord for the pendingID
is appended. If backup becomes live and loads the journal, it will
drop the large message add record because there is a deleteRecord of
the same ID (even though it is a pendingID of another message).
As a result the expecting client will never get this large message.

So in summary, the root cause is that the pendingIDs for large
messages are generated at backup while backup is not alive.

The solution to this is that instead of the backup generating
the pendingID, we make them all be generated in advance
at live server and let them replicated to backup whereever needed.
The ID generater at backup only works when backup becomes live
(when it is properly initialized from journal).

(cherry picked from commit d50f577cd50df37634f592db65200861fe3e13d3)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b1a1b05
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b1a1b05
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b1a1b05

Branch: refs/heads/1.x
Commit: 7b1a1b058a5ec84267363ad1d12eacef5f3d8a1d
Parents: 5faf2cd
Author: Howard Gao <ho...@gmail.com>
Authored: Thu Jun 29 00:03:47 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:05 2017 -0400

----------------------------------------------------------------------
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../impl/journal/JournalStorageManager.java     |  19 ++-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../impl/journal/LargeServerMessageInSync.java  |  10 ++
 .../ReplicationLargeMessageEndMessage.java      |  19 ++-
 .../replication/ReplicatedLargeMessage.java     |   4 +
 .../core/replication/ReplicationEndpoint.java   |   1 +
 .../core/replication/ReplicationManager.java    |   7 +-
 .../artemis/core/server/LargeServerMessage.java |   6 +-
 .../failover/FailoverTestWithDivert.java        | 148 +++++++++++++++++++
 .../replication/ReplicationTest.java            |   4 +-
 11 files changed, 203 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index bf682ff..5ea104b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1546,7 +1546,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       if (largeServerMessage.getPendingRecordID() >= 0) {
          try {
             confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
-            largeServerMessage.setPendingRecordID(-1);
+            largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index e79a9cb..ca1b805 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -272,7 +272,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
          }
          if (replicator != null) {
-            replicator.largeMessageDelete(largeMsgId);
+            replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
          }
       }
       largeMessagesToDelete.clear();
@@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       journalFF.releaseBuffer(buffer);
    }
 
-   public long storePendingLargeMessage(final long messageID) throws Exception {
+   public long storePendingLargeMessage(final long messageID, long recordID) throws Exception {
       readLock();
       try {
-         long recordID = generateID();
+         if (recordID == LargeServerMessage.NO_PENDING_ID) {
+            recordID = generateID();
+         } else {
+            //this means the large message doesn't
+            //have a pendingRecordID, but one has been
+            //generated (coming from live server) for use.
+            recordID = -recordID;
+         }
 
          messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
 
@@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             // And the client won't be waiting for the actual file to be deleted.
             // We set a temporary record (short lived) on the journal
             // to avoid a situation where the server is restarted and pending large message stays on forever
-            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
+            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
          } catch (Exception e) {
             throw new ActiveMQInternalErrorException(e.getMessage(), e);
          }
@@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
                readLock();
                try {
                   if (replicator != null) {
-                     replicator.largeMessageDelete(largeServerMessage.getMessageID());
+                     replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
                   }
                   file.delete();
 
@@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
          if (largeMessage.isDurable()) {
             // We store a marker on the journal that the large file is pending
-            long pendingRecordID = storePendingLargeMessage(id);
+            long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
 
             largeMessage.setPendingRecordID(pendingRecordID);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 22929e7..22cfa0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -44,7 +44,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
 
    private final JournalStorageManager storageManager;
 
-   private long pendingRecordID = -1;
+   private long pendingRecordID = NO_PENDING_ID;
 
    private boolean paged;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index 42126d4..66ccd8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
       storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
    }
 
+   @Override
+   public void setPendingRecordID(long pendingRecordID) {
+      mainLM.setPendingRecordID(pendingRecordID);
+   }
+
+   @Override
+   public long getPendingRecordID() {
+      return mainLM.getPendingRecordID();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index 4a09cc0..a9be86a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
 public class ReplicationLargeMessageEndMessage extends PacketImpl {
 
    long messageId;
+   long pendingRecordId;
 
    public ReplicationLargeMessageEndMessage() {
       super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
    }
 
-   public ReplicationLargeMessageEndMessage(final long messageId) {
+   public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
       this();
       this.messageId = messageId;
+      //we use negative value to indicate that this id is pre-generated by live node
+      //so that it won't be generated at backup.
+      //see https://issues.apache.org/jira/browse/ARTEMIS-1221
+      this.pendingRecordId = -pendingRecordId;
    }
 
-
    @Override
    public int expectedEncodeSize() {
       return PACKET_HEADERS_SIZE +
-         DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+         DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
+         DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);
+      buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       messageId = buffer.readLong();
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         pendingRecordId = buffer.readLong();
+      }
    }
 
    /**
@@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
          return false;
       return true;
    }
+
+   public long getPendingRecordId() {
+      return pendingRecordId;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
index 3b6327a..b744805 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
@@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
     */
    void addBytes(byte[] body) throws Exception;
 
+   void setPendingRecordID(long pendingRecordID);
+
+   long getPendingRecordID();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 5a01bf7..d6f807c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -519,6 +519,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
       final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
       if (message != null) {
+         message.setPendingRecordID(packet.getPendingRecordId());
          executor.execute(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d8d70f0..e1027d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@@ -238,9 +239,11 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
    }
 
-   public void largeMessageDelete(final Long messageId) {
+   //we pass in storageManager to generate ID only if enabled
+   public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
       if (enabled) {
-         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
+         long pendingRecordID = storageManager.generateID();
+         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2a16ed2..38f36ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -22,13 +22,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 
 public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
 
+   long NO_PENDING_ID = -1;
+
    @Override
    void addBytes(byte[] bytes) throws Exception;
 
-   void setPendingRecordID(long pendingRecordID);
-
-   long getPendingRecordID();
-
    /**
     * We have to copy the large message content in case of DLQ and paged messages
     * For that we need to pre-mark the LargeMessage with a flag when it is paged

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
new file mode 100644
index 0000000..76efc22
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FailoverTestWithDivert extends FailoverTestBase {
+
+   private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
+   private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
+   private ClientSessionFactoryInternal sf;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+
+      liveConfig.setJournalFileSize(10240000);
+      backupConfig.setJournalFileSize(10240000);
+      addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
+      addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
+      addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+      addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+   }
+
+   private void addQueue(Configuration serverConfig, String address, String name) {
+
+      List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
+      CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
+      addrCfg.setName(address);
+      addrCfg.addRoutingType(RoutingType.ANYCAST);
+      CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
+      qConfig.setName(name);
+      qConfig.setAddress(address);
+      addrCfg.addQueueConfiguration(qConfig);
+      addrConfigs.add(addrCfg);
+   }
+
+   private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) {
+      List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
+      DivertConfiguration newDivert = new DivertConfiguration();
+      newDivert.setName("myDivert");
+      newDivert.setAddress(source);
+      newDivert.setForwardingAddress(target);
+      newDivert.setExclusive(exclusive);
+      divertConfigs.add(newDivert);
+   }
+
+   @Test
+   public void testUniqueIDsWithDivert() throws Exception {
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      int minLarge = locator.getMinLargeMessageSize();
+
+      ClientSession session = sf.createSession(false, false);
+      addClientSession(session);
+      session.start();
+
+      final int num = 100;
+      ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage message = createLargeMessage(session, 2 * minLarge);
+         producer.send(message);
+      }
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
+      for (int i = 0;  i < num; i++) {
+         ClientMessage receivedFromSourceQueue = consumer.receive(5000);
+         assertNotNull(receivedFromSourceQueue);
+         receivedFromSourceQueue.acknowledge();
+      }
+      session.commit();
+
+      crash(session);
+
+      ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
+         assertNotNull(receivedFromTargetQueue);
+         receivedFromTargetQueue.acknowledge();
+      }
+      session.commit();
+   }
+
+   private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
+      ClientMessage message = session.createMessage(true);
+      ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
+      final int propSize = 10240;
+      while (bodyBuffer.writerIndex() < largeSize) {
+         byte[] prop = new byte[propSize];
+         bodyBuffer.writeBytes(prop);
+      }
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 1ae9527..398e895 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
    public void testSendPackets() throws Exception {
       setupServer(true);
 
-      StorageManager storage = getStorage();
+      JournalStorageManager storage = getStorage();
 
       manager = liveServer.getReplicationManager();
       waitForComponent(manager);
@@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
 
       manager.largeMessageWrite(500, new byte[1024]);
 
-      manager.largeMessageDelete(Long.valueOf(500));
+      manager.largeMessageDelete(Long.valueOf(500), storage);
 
       blockOnReplication(storage, manager);
 


[15/17] activemq-artemis git commit: NO-JIRA: Allow subclasses to more easily override BroadcastEndpointFactory used during connection factory creation.

Posted by cl...@apache.org.
NO-JIRA: Allow subclasses to more easily override BroadcastEndpointFactory used during connection factory creation.

(cherry picked from commit 4bf204c01225e058a5b723e301e53a79f3128f3a)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/313e8167
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/313e8167
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/313e8167

Branch: refs/heads/1.x
Commit: 313e8167074ece0f4b6cfc19c92ccad0b17fa172
Parents: ee4692d
Author: Paul Ferraro <pa...@redhat.com>
Authored: Wed Aug 23 19:51:30 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:08 2017 -0400

----------------------------------------------------------------------
 .../artemis/ra/ActiveMQResourceAdapter.java     | 87 ++++++++------------
 1 file changed, 35 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/313e8167/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index a0dca4a..0ce1b68 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -1671,38 +1671,15 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       ActiveMQConnectionFactory cf;
       List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
 
-      String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
-
       Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
 
-      String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
-
-      String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
-
-      String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
-
       if (ha == null) {
          ha = ActiveMQClient.DEFAULT_IS_HA;
       }
 
-      if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
-         BroadcastEndpointFactory endpointFactory = null;
-
-         if (jgroupsLocatorClassName != null) {
-            String jchannelRefName = raProperties.getJgroupsChannelRefName();
-            JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
-            endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
-         } else if (discoveryAddress != null) {
-            Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
-            if (discoveryPort == null) {
-               discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
-            }
+      BroadcastEndpointFactory endpointFactory = this.createBroadcastEndpointFactory(overrideProperties);
 
-            String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
-            endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
-         } else if (jgroupsFileName != null) {
-            endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
-         }
+      if (endpointFactory != null) {
          Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
          if (refreshTimeout == null) {
             refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
@@ -1769,34 +1746,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       ActiveMQConnectionFactory cf;
       List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
 
-      String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
-
-      String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
-
-      String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
-
       if (connectorClassName == null) {
-         BroadcastEndpointFactory endpointFactory = null;
-         if (discoveryAddress != null) {
-            Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
-            if (discoveryPort == null) {
-               discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
-            }
-
-            String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
-            endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
-         } else if (jgroupsFileName != null) {
-            endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
-         } else {
-            String jgroupsLocatorClass = raProperties.getJgroupsChannelLocatorClass();
-            if (jgroupsLocatorClass != null) {
-               String jgroupsChannelRefName = raProperties.getJgroupsChannelRefName();
-               JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClass, jgroupsChannelRefName);
-               endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
-            }
-            if (endpointFactory == null) {
-               throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
-            }
+         BroadcastEndpointFactory endpointFactory = this.createBroadcastEndpointFactory(overrideProperties);
+         if (endpointFactory == null) {
+            throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
          }
 
          Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
@@ -1854,6 +1807,36 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       return cf;
    }
 
+   protected BroadcastEndpointFactory createBroadcastEndpointFactory(final ConnectionFactoryProperties overrideProperties) {
+
+      String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
+      if (discoveryAddress != null) {
+         Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
+         if (discoveryPort == null) {
+            discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
+         }
+
+         String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
+         return new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
+      }
+
+      String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
+
+      String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
+      if (jgroupsLocatorClassName != null) {
+         String jchannelRefName = raProperties.getJgroupsChannelRefName();
+         JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
+         return new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
+      }
+
+      String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
+      if (jgroupsFileName != null) {
+         return new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
+      }
+
+      return null;
+   }
+
    public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
                                                            final Map<String, Object> overrideConnectionParams) {
       Map<String, Object> map = new HashMap<>();


[14/17] activemq-artemis git commit: ARTEMIS-1353 Initial replication of large messages out of executor

Posted by cl...@apache.org.
ARTEMIS-1353 Initial replication of large messages out of executor

This is based on the work @jbertram made at the github pr #1466 and the discussions we had there

(cherry picked from commit ce6942a9aa9375efaa449424fe89de2db3f22e36)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/492b55e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/492b55e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/492b55e0

Branch: refs/heads/1.x
Commit: 492b55e09affb03a943c3516a5a3bf513024ca8b
Parents: 5db0c87
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 15:01:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |  7 ++
 .../core/protocol/core/impl/PacketImpl.java     |  1 +
 .../wireformat/ReplicationSyncFileMessage.java  | 10 +++
 .../core/replication/ReplicationManager.java    | 81 ++++++++++++++------
 .../impl/SharedNothingLiveActivation.java       |  2 +-
 .../replication/ReplicationTest.java            |  2 +-
 6 files changed, 77 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index a86c5c1..efb9aa6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -93,4 +93,11 @@ public interface Packet {
     * @return true if confirmation is required
     */
    boolean isRequiresConfirmations();
+
+
+
+   /** The packe wasn't used because the stream is closed,
+    * this gives a chance to sub classes to cleanup anything that won't be used. */
+   default void release() {
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 99c052b..afbaf53 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -354,6 +354,7 @@ public class PacketImpl implements Packet {
       return result;
    }
 
+
    @Override
    public boolean equals(Object obj) {
       if (this == obj) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 4d3c32f..b81782b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       if (dataSize > 0) {
          buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
       }
+
+      release();
+   }
+
+   @Override
+   public void release() {
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index e1027d4..d298a24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
@@ -92,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent {
          public boolean toBoolean() {
             return true;
          }
-      },
-      ADD {
+      }, ADD {
          @Override
          public boolean toBoolean() {
             return false;
@@ -129,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    private final long timeout;
 
+   private final long initialReplicationSyncTimeout;
+
    private volatile boolean inSync = true;
 
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -138,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent {
     */
    public ReplicationManager(CoreRemotingConnection remotingConnection,
                              final long timeout,
+                             final long initialReplicationSyncTimeout,
                              final ExecutorFactory executorFactory) {
       this.executorFactory = executorFactory;
+      this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
       this.replicationStream = executorFactory.getExecutor();
@@ -178,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent {
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
       }
    }
 
@@ -339,10 +343,10 @@ public final class ReplicationManager implements ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet) {
-      return sendReplicatePacket(packet, true, true);
+      return sendReplicatePacket(packet, true);
    }
 
-   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
       if (!enabled)
          return null;
       boolean runItNow = false;
@@ -353,22 +357,17 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
 
       if (enabled) {
-         if (useExecutor) {
-            replicationStream.execute(() -> {
-               if (enabled) {
-                  pendingTokens.add(repliToken);
-                  flowControl(packet.expectedEncodeSize());
-                  replicatingChannel.send(packet);
-               }
-            });
-         } else {
-            pendingTokens.add(repliToken);
-            flowControl(packet.expectedEncodeSize());
-            replicatingChannel.send(packet);
-         }
+         replicationStream.execute(() -> {
+            if (enabled) {
+               pendingTokens.add(repliToken);
+               flowControl(packet.expectedEncodeSize());
+               replicatingChannel.send(packet);
+            }
+         });
       } else {
          // Already replicating channel failed, so just play the action now
          runItNow = true;
+         packet.release();
       }
 
       // Execute outside lock
@@ -396,7 +395,6 @@ public final class ReplicationManager implements ActiveMQComponent {
          }
       }
 
-
       return flowWorked;
    }
 
@@ -511,6 +509,24 @@ public final class ReplicationManager implements ActiveMQComponent {
          sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
+   private class FlushAction implements Runnable {
+
+      ReusableLatch latch = new ReusableLatch(1);
+
+      public void reset() {
+         latch.setCount(1);
+      }
+
+      public boolean await(long timeout, TimeUnit unit) throws Exception {
+         return latch.await(timeout, unit);
+      }
+
+      @Override
+      public void run() {
+         latch.countDown();
+      }
+   }
+
    /**
     * Sends large files in reasonably sized chunks to the backup during replication synchronization.
     *
@@ -532,15 +548,19 @@ public final class ReplicationManager implements ActiveMQComponent {
          file.open();
       }
       int size = 32 * 1024;
-      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
+      int flowControlSize = 10;
+
+      int packetsSent = 0;
+      FlushAction action = new FlushAction();
 
       try {
-         try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
-              final FileChannel channel = fis.getChannel()) {
+         try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
             while (true) {
+               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
                buffer.clear();
                ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
                final int bytesRead = channel.read(byteBuffer);
@@ -558,18 +578,31 @@ public final class ReplicationManager implements ActiveMQComponent {
                // We cannot simply send everything of a file through the executor,
                // otherwise we would run out of memory.
                // so we don't use the executor here
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+               packetsSent++;
+
+               if (packetsSent % flowControlSize == 0) {
+                  flushReplicationStream(action);
+               }
                if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                   break;
             }
          }
+         flushReplicationStream(action);
       } finally {
-         buffer.release();
          if (file.isOpen())
             file.close();
       }
    }
 
+   private void flushReplicationStream(FlushAction action) throws Exception {
+      action.reset();
+      replicationStream.execute(action);
+      if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
+         throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+      }
+   }
+
    /**
     * Reserve the following fileIDs in the backup server.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index c984ae2..b532e57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
          ReplicationFailureListener listener = new ReplicationFailureListener();
          rc.addCloseListener(listener);
          rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
+         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
          replicationManager.start();
          Thread t = new Thread(new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 398e895..46cb085 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       setupServer(false);
       try {
          ClientSessionFactory sf = createSessionFactory(locator);
-         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
+         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
          addActiveMQComponent(manager);
          manager.start();
          Assert.fail("Exception was expected");


[09/17] activemq-artemis git commit: ARTEMIS-1305 Fix checkstyle and traces

Posted by cl...@apache.org.
ARTEMIS-1305 Fix checkstyle and traces

https://issues.jboss.org/browse/JBEAP-9235


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fef0256b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fef0256b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fef0256b

Branch: refs/heads/1.x
Commit: fef0256bfaeece5a23521eabd252ea583db4d540
Parents: 6f0beba
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jul 25 17:46:22 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:06 2017 -0400

----------------------------------------------------------------------
 .../extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fef0256b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
index 1afd632..8b14e39 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
@@ -380,7 +380,8 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
                serverSessions.add(session);
             }
          }
-      } while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
+      }
+      while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
 
       System.err.println("Returning " + serverSessions.size() + " sessions");
       return serverSessions;


[12/17] activemq-artemis git commit: ARTEMIS-1368 Artemis gets to state when it doesn't respond to producer

Posted by cl...@apache.org.
ARTEMIS-1368 Artemis gets to state when it doesn't respond to producer

There is a leak on replication tokens in the moment when a backup is
shutdowned or killed and the ReplicationManager is stopped. If there
are some tasks (holding replication tokens) in the executor, these
tokens are simply ignored and replicationDone method isn't called on
them. Because of this, some tasks in OperationContextImpl cannot be
finished.

(cherry picked from commit 88a018e17fd49097de1186c65e25cd0af578b6a9)
(cherry picked from commit d6cbc0aa885fa88beb7f1b2450cdfe4da9466947)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0f4a8c3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0f4a8c3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0f4a8c3c

Branch: refs/heads/1.x
Commit: 0f4a8c3c2d869527b4575e5b45b527d7640f9852
Parents: 492b55e
Author: Erich Duda <ed...@redhat.com>
Authored: Tue Aug 22 21:48:19 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationManager.java    | 34 ++++++++------------
 1 file changed, 13 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f4a8c3c/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d298a24..4241996 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -347,34 +347,26 @@ public final class ReplicationManager implements ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
-      if (!enabled)
+      if (!enabled) {
+         packet.release();
          return null;
-      boolean runItNow = false;
+      }
 
       final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
       if (lineUp) {
          repliToken.replicationLineUp();
       }
 
-      if (enabled) {
-         replicationStream.execute(() -> {
-            if (enabled) {
-               pendingTokens.add(repliToken);
-               flowControl(packet.expectedEncodeSize());
-               replicatingChannel.send(packet);
-            }
-         });
-      } else {
-         // Already replicating channel failed, so just play the action now
-         runItNow = true;
-         packet.release();
-      }
-
-      // Execute outside lock
-
-      if (runItNow) {
-         repliToken.replicationDone();
-      }
+      replicationStream.execute(() -> {
+         if (enabled) {
+            pendingTokens.add(repliToken);
+            flowControl(packet.expectedEncodeSize());
+            replicatingChannel.send(packet);
+         } else {
+            packet.release();
+            repliToken.replicationDone();
+         }
+      });
 
       return repliToken;
    }


[08/17] activemq-artemis git commit: ARTEMIS-1220 removing broken test

Posted by cl...@apache.org.
ARTEMIS-1220 removing broken test

Testsuite won't compile


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6f0bebaa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6f0bebaa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6f0bebaa

Branch: refs/heads/1.x
Commit: 6f0bebaad7cc3de7498941d406f9a009779c26ff
Parents: 08fdae3
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Jun 26 16:19:09 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:06 2017 -0400

----------------------------------------------------------------------
 .../LargeMessageOverReplicationTest.java        | 257 -------------------
 1 file changed, 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6f0bebaa/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
deleted file mode 100644
index 48a6757..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
-import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
-import org.apache.activemq.artemis.utils.ReusableLatch;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.jboss.logging.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
-
-   public static int messageChunkCount = 0;
-
-   private static final ReusableLatch ruleFired = new ReusableLatch(1);
-   private static ActiveMQServer backupServer;
-   private static ActiveMQServer liveServer;
-
-   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
-   ActiveMQConnection connection;
-   Session session;
-   Queue queue;
-   MessageProducer producer;
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-      ruleFired.setCount(1);
-      messageChunkCount = 0;
-
-      TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
-      TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
-      TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
-      TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
-
-      Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
-
-      Configuration liveConfig = createDefaultInVMConfig();
-
-      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
-
-      liveServer = createServer(liveConfig);
-      liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
-      liveServer.start();
-
-      waitForServerToStart(liveServer);
-
-      backupServer = createServer(backupConfig);
-      backupServer.start();
-
-      waitForServerToStart(backupServer);
-
-      // Just to make sure the expression worked
-      Assert.assertEquals(10000, factory.getMinLargeMessageSize());
-      Assert.assertEquals(10000, factory.getProducerWindowSize());
-      Assert.assertEquals(100, factory.getRetryInterval());
-      Assert.assertEquals(-1, factory.getReconnectAttempts());
-      Assert.assertTrue(factory.isHA());
-
-      connection = (ActiveMQConnection) factory.createConnection();
-
-      waitForRemoteBackup(connection.getSessionFactory(), 30);
-
-      session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      queue = session.createQueue("Queue");
-      producer = session.createProducer(queue);
-
-   }
-
-   @After
-   public void stopServers() throws Exception {
-      if (connection != null) {
-         try {
-            connection.close();
-         } catch (Exception e) {
-         }
-      }
-      if (backupServer != null) {
-         backupServer.stop();
-         backupServer = null;
-      }
-
-      if (liveServer != null) {
-         liveServer.stop();
-         liveServer = null;
-      }
-
-      backupServer = liveServer = null;
-   }
-
-   /*
-  * simple test to induce a potential race condition where the server's acceptors are active, but the server's
-  * state != STARTED
-  */
-   @Test
-   @BMRules(
-      rules = {@BMRule(
-         name = "InterruptSending",
-         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
-         targetMethod = "sendLargeMessageChunk",
-         targetLocation = "ENTRY",
-         action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")})
-   public void testSendLargeMessage() throws Exception {
-
-      MapMessage message = createLargeMessage();
-
-      try {
-         producer.send(message);
-         Assert.fail("expected an exception");
-         //      session.commit();
-      } catch (JMSException expected) {
-      }
-
-      session.rollback();
-
-      producer.send(message);
-      session.commit();
-
-      MessageConsumer consumer = session.createConsumer(queue);
-      connection.start();
-
-      MapMessage messageRec = (MapMessage) consumer.receive(5000);
-      Assert.assertNotNull(messageRec);
-
-      for (int i = 0; i < 10; i++) {
-         Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
-      }
-   }
-
-   @Test
-   @BMRules(
-      rules = {@BMRule(
-         name = "InterruptReceive",
-         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback",
-         targetMethod = "sendLargeMessageContinuation",
-         targetLocation = "ENTRY",
-         action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")})
-   public void testReceiveLargeMessage() throws Exception {
-
-      MapMessage message = createLargeMessage();
-
-      producer.send(message);
-      session.commit();
-
-      MessageConsumer consumer = session.createConsumer(queue);
-      connection.start();
-
-      MapMessage messageRec = null;
-
-      try {
-         consumer.receive(5000);
-         Assert.fail("Expected a failure here");
-      } catch (JMSException expected) {
-      }
-
-      session.rollback();
-
-      messageRec = (MapMessage) consumer.receive(5000);
-      Assert.assertNotNull(messageRec);
-      session.commit();
-
-      for (int i = 0; i < 10; i++) {
-         Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
-      }
-   }
-
-   public static void messageChunkReceived() {
-      messageChunkCount++;
-
-      if (messageChunkCount == 100) {
-         final CountDownLatch latch = new CountDownLatch(1);
-         new Thread() {
-            @Override
-            public void run() {
-               try {
-                  latch.countDown();
-                  liveServer.stop();
-               } catch (Exception e) {
-                  e.printStackTrace();
-               }
-            }
-         }.start();
-         try {
-            // just to make sure it's about to be stopped
-            // avoiding bootstrapping the thread as a delay
-            latch.await(1, TimeUnit.MINUTES);
-         } catch (Throwable ignored) {
-         }
-      }
-   }
-
-   public static void messageChunkSent() {
-      messageChunkCount++;
-
-      try {
-         if (messageChunkCount == 10) {
-            liveServer.stop(true);
-
-            System.err.println("activating");
-            if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {
-               Logger.getLogger(LargeMessageOverReplicationTest.class).warn("Can't failover server");
-            }
-         }
-      } catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   private MapMessage createLargeMessage() throws JMSException {
-      MapMessage message = session.createMapMessage();
-
-      for (int i = 0; i < 10; i++) {
-         message.setBytes("test" + i, new byte[1024 * 1024]);
-      }
-      return message;
-   }
-
-}


[02/17] activemq-artemis git commit: NO-JIRA Remove artemis-feature dep from integration tests

Posted by cl...@apache.org.
NO-JIRA Remove artemis-feature dep from integration tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5faf2cd8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5faf2cd8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5faf2cd8

Branch: refs/heads/1.x
Commit: 5faf2cd829eea61577e9b52077b28ceac03210e7
Parents: 2c89287
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Jun 30 11:30:04 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:05 2017 -0400

----------------------------------------------------------------------
 tests/integration-tests/pom.xml | 6 ------
 1 file changed, 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5faf2cd8/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 2e90ac7..91cc6db 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -156,12 +156,6 @@
          <artifactId>artemis-hornetq-protocol</artifactId>
          <version>${project.version}</version>
       </dependency>
-      <dependency>
-         <groupId>org.apache.activemq</groupId>
-         <artifactId>artemis-features</artifactId>
-         <version>${project.version}</version>
-         <type>pom</type>
-      </dependency>
 
       <!-- MQTT Deps -->
       <dependency>


[10/17] activemq-artemis git commit: ARTEMIS-1417 Failback not working on NFSv4

Posted by cl...@apache.org.
ARTEMIS-1417 Failback not working on NFSv4

With NFSv4 it is now necessary to lock/unlock the byte of the server
lock file where the state information is written so that the
information is then flushed to the other clients looking at the file.

(cherry picked from commit 2ec173bc708daca163c9356cf12440432abc61c4)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f5102350
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f5102350
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f5102350

Branch: refs/heads/1.x
Commit: f51023506e20101cea87676df44337a019e24cc6
Parents: b4bbdff
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Sep 12 11:17:08 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 .../core/server/impl/FileLockNodeManager.java   | 74 ++++++++++++++------
 1 file changed, 53 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5102350/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
index 694b112..92828bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
@@ -33,11 +33,13 @@ public class FileLockNodeManager extends NodeManager {
 
    private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
 
-   private static final int LIVE_LOCK_POS = 1;
+   private static final long STATE_LOCK_POS = 0;
 
-   private static final int BACKUP_LOCK_POS = 2;
+   private static final long LIVE_LOCK_POS = 1;
 
-   private static final int LOCK_LENGTH = 1;
+   private static final long BACKUP_LOCK_POS = 2;
+
+   private static final long LOCK_LENGTH = 1;
 
    private static final byte LIVE = 'L';
 
@@ -113,6 +115,7 @@ public class FileLockNodeManager extends NodeManager {
 
    @Override
    public void awaitLiveNode() throws Exception {
+      logger.debug("awaiting live node...");
       do {
          byte state = getState();
          while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
@@ -228,25 +231,52 @@ public class FileLockNodeManager extends NodeManager {
     * @param status
     * @throws IOException
     */
-   private void writeFileLockStatus(byte status) throws IOException {
+   private void writeFileLockStatus(byte status) throws Exception {
       if (replicatedBackup && channel == null)
          return;
+      logger.debug("writing status: " + status);
       ByteBuffer bb = ByteBuffer.allocateDirect(1);
       bb.put(status);
       bb.position(0);
-      channel.write(bb, 0);
-      channel.force(true);
+
+      if (!channel.isOpen()) {
+         setUpServerLockFile();
+      }
+      FileLock lock = null;
+      try {
+         lock = lock(STATE_LOCK_POS);
+         channel.write(bb, 0);
+         channel.force(true);
+      } finally {
+         if (lock != null) {
+            lock.release();
+         }
+      }
    }
 
    private byte getState() throws Exception {
+      byte result;
+      logger.debug("getting state...");
       ByteBuffer bb = ByteBuffer.allocateDirect(1);
       int read;
-      read = channel.read(bb, 0);
-      if (read <= 0) {
-         return FileLockNodeManager.NOT_STARTED;
-      } else {
-         return bb.get(0);
+      FileLock lock = null;
+      try {
+         lock = lock(STATE_LOCK_POS);
+         read = channel.read(bb, 0);
+         if (read <= 0) {
+            result = FileLockNodeManager.NOT_STARTED;
+         } else {
+            result = bb.get(0);
+         }
+      } finally {
+         if (lock != null) {
+            lock.release();
+         }
       }
+
+      logger.debug("state: " + result);
+
+      return result;
    }
 
    @Override
@@ -263,25 +293,27 @@ public class FileLockNodeManager extends NodeManager {
       return getNodeId();
    }
 
-   protected FileLock tryLock(final int lockPos) throws Exception {
+   protected FileLock tryLock(final long lockPos) throws IOException {
       try {
-         return channel.tryLock(lockPos, LOCK_LENGTH, false);
+         logger.debug("trying to lock position: " + lockPos);
+         FileLock lock = channel.tryLock(lockPos, LOCK_LENGTH, false);
+         if (lock != null) {
+            logger.debug("locked position: " + lockPos);
+         } else {
+            logger.debug("failed to lock position: " + lockPos);
+         }
+         return lock;
       } catch (java.nio.channels.OverlappingFileLockException ex) {
          // This just means that another object on the same JVM is holding the lock
          return null;
       }
    }
 
-   protected FileLock lock(final int liveLockPos) throws Exception {
+   protected FileLock lock(final long lockPosition) throws Exception {
       long start = System.currentTimeMillis();
 
       while (!interrupted) {
-         FileLock lock = null;
-         try {
-            lock = channel.tryLock(liveLockPos, 1, false);
-         } catch (java.nio.channels.OverlappingFileLockException ex) {
-            // This just means that another object on the same JVM is holding the lock
-         }
+         FileLock lock = tryLock(lockPosition);
 
          if (lock == null) {
             try {
@@ -302,7 +334,7 @@ public class FileLockNodeManager extends NodeManager {
       // need to investigate further and review
       FileLock lock;
       do {
-         lock = channel.tryLock(liveLockPos, 1, false);
+         lock = tryLock(lockPosition);
          if (lock == null) {
             try {
                Thread.sleep(500);


[11/17] activemq-artemis git commit: add licenses tag to pom.xml

Posted by cl...@apache.org.
add licenses tag to pom.xml


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b4bbdff4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b4bbdff4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b4bbdff4

Branch: refs/heads/1.x
Commit: b4bbdff456c519cb34df444fb15cee8b2ce0bb46
Parents: 0f4a8c3
Author: psakar <sa...@chare.eu>
Authored: Thu Aug 24 12:38:05 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 pom.xml | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b4bbdff4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6698f16..0866c05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,14 @@
       <geronimo-annotation_1.2_spec.version>1.0</geronimo-annotation_1.2_spec.version>
    </properties>
 
+   <licenses>
+      <license>
+         <name>Apache License 2.0</name>
+         <url>http://repository.jboss.org/licenses/apache-2.0.txt</url>
+         <distribution>repo</distribution>
+      </license>
+   </licenses>
+
    <scm>
       <connection>scm:git:http://git-wip-us.apache.org/repos/asf/activemq-artemis.git</connection>
       <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/activemq-artemis.git</developerConnection>


[17/17] activemq-artemis git commit: This closes #1553

Posted by cl...@apache.org.
This closes #1553


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b56950c3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b56950c3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b56950c3

Branch: refs/heads/1.x
Commit: b56950c3511ffcea339c36b12ede8f00860e77a0
Parents: c85575f e03c41a
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Sep 26 14:48:08 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:48:08 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |   7 +
 .../core/protocol/core/impl/PacketImpl.java     |   1 +
 .../jms/server/ActiveMQJMSServerLogger.java     |   4 +-
 .../artemis/core/io/AbstractSequentialFile.java |   7 +
 .../core/io/AbstractSequentialFileFactory.java  |   5 +
 .../artemis/core/io/aio/AIOSequentialFile.java  |  17 ++
 .../core/io/aio/AIOSequentialFileFactory.java   |  12 +-
 .../artemis/ra/ActiveMQResourceAdapter.java     |  87 +++----
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../impl/journal/JournalStorageManager.java     |  24 +-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../impl/journal/LargeServerMessageInSync.java  |  10 +
 .../ReplicationLargeMessageEndMessage.java      |  19 +-
 .../wireformat/ReplicationSyncFileMessage.java  |  10 +
 .../replication/ReplicatedLargeMessage.java     |   4 +
 .../core/replication/ReplicationEndpoint.java   |   1 +
 .../core/replication/ReplicationManager.java    |  98 ++++---
 .../artemis/core/server/LargeServerMessage.java |   6 +-
 .../core/server/cluster/ClusterManager.java     |   2 +-
 .../core/server/cluster/impl/BridgeImpl.java    |  12 +-
 .../cluster/impl/ClusterConnectionBridge.java   |   6 +-
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../core/server/files/FileStoreMonitor.java     |   8 +-
 .../core/server/impl/ActiveMQServerImpl.java    |   2 +-
 .../core/server/impl/FileLockNodeManager.java   |  74 ++++--
 .../impl/SharedNothingLiveActivation.java       |   2 +-
 .../core/server/files/FileStoreMonitorTest.java |   4 +-
 .../xa/recovery/ActiveMQXARecoveryLogger.java   |   4 +-
 .../xa/recovery/XARecoveryConfig.java           |   6 +-
 pom.xml                                         |   8 +
 .../LargeMessageOverReplicationTest.java        | 257 -------------------
 ...MDBMultipleHandlersServerDisconnectTest.java |   3 +-
 tests/integration-tests/pom.xml                 |   6 -
 .../failover/FailoverTestWithDivert.java        | 107 ++++++++
 .../replication/ReplicationTest.java            |   6 +-
 .../unit/core/journal/impl/TimedBufferTest.java |   5 +-
 36 files changed, 418 insertions(+), 412 deletions(-)
----------------------------------------------------------------------



[05/17] activemq-artemis git commit: ARTEMIS-1180 Artemis is logging warnings during server shut down

Posted by cl...@apache.org.
ARTEMIS-1180 Artemis is logging warnings during server shut down

(cherry picked from commit 2443eaaa003ef913187c14dbc567544788224821)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31fa7584
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31fa7584
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31fa7584

Branch: refs/heads/1.x
Commit: 31fa7584ab60d06b90b7ba38da633b0f0ed81991
Parents: 6febd5e
Author: Dmitrii Tikhomirov <dt...@redhat.com>
Authored: Wed May 24 15:18:16 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:05 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/server/cluster/ClusterManager.java     |  2 +-
 .../artemis/core/server/cluster/impl/BridgeImpl.java    | 12 +++++++++---
 .../server/cluster/impl/ClusterConnectionBridge.java    |  6 +++---
 .../core/server/cluster/impl/ClusterConnectionImpl.java |  2 +-
 4 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31fa7584/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 96fad97..eddbda4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -472,7 +472,7 @@ public final class ClusterManager implements ActiveMQComponent {
 
       clusterLocators.add(serverLocator);
 
-      Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server.getStorageManager());
+      Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server);
 
       bridges.put(config.getName(), bridge);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31fa7584/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index d928fff..94e28e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -156,6 +156,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private boolean keepConnecting = true;
 
+   private ActiveMQServer server;
+
    public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final int initialConnectAttempts,
                      final int reconnectAttempts,
@@ -174,7 +176,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                      final boolean useDuplicateDetection,
                      final String user,
                      final String password,
-                     final StorageManager storageManager) {
+                     final ActiveMQServer server) {
 
       this.reconnectAttempts = reconnectAttempts;
 
@@ -211,6 +213,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       this.user = user;
 
       this.password = password;
+
+      this.server = server;
    }
 
    public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
@@ -603,7 +607,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    @Override
    public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
-      ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
+      if (server.isStarted()) {
+         ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
+      }
 
       synchronized (connectionGuard) {
          keepConnecting = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31fa7584/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 3b35c14..96106d5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -37,8 +37,8 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -100,13 +100,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                   final boolean useDuplicateDetection,
                                   final String user,
                                   final String password,
-                                  final StorageManager storageManager,
+                                  final ActiveMQServer server,
                                   final SimpleString managementAddress,
                                   final SimpleString managementNotificationAddress,
                                   final MessageFlowRecord flowRecord,
                                   final TransportConfiguration connector) {
       super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
-            retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, storageManager);
+            retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server);
 
       this.discoveryLocator = discoveryLocator;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31fa7584/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 9e96053..fe72725 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -800,7 +800,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
 
-      ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server.getStorageManager(), managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector());
+      ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector());
 
       targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
 


[13/17] activemq-artemis git commit: ARTEMIS-1418 AIO Shutdown on IOError and logging

Posted by cl...@apache.org.
ARTEMIS-1418 AIO Shutdown on IOError and logging

(cherry picked from commit 520a40b1a1431fb0fdc1666c556342410a56e4eb)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ee4692d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ee4692d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ee4692d5

Branch: refs/heads/1.x
Commit: ee4692d5cad4b109f6ffa05274bee15d4df690ba
Parents: f510235
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 8 15:00:35 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/AbstractSequentialFile.java    |  7 +++++++
 .../core/io/AbstractSequentialFileFactory.java     |  5 +++++
 .../artemis/core/io/aio/AIOSequentialFile.java     | 17 +++++++++++++++++
 .../core/io/aio/AIOSequentialFileFactory.java      | 12 +++++++++++-
 .../core/server/files/FileStoreMonitor.java        |  8 +++++++-
 .../core/server/impl/ActiveMQServerImpl.java       |  2 +-
 .../core/server/files/FileStoreMonitorTest.java    |  4 ++--
 7 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index f6cb9b0..32168fc 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -35,9 +35,12 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
 
 public abstract class AbstractSequentialFile implements SequentialFile {
 
+   private static final Logger logger = Logger.getLogger(AbstractSequentialFile.class);
+
    private File file;
 
    protected final File directory;
@@ -267,6 +270,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
 
       @Override
       public void onError(final int errorCode, final String errorMessage) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
+         }
+
          final int size = delegates.size();
          for (int i = 0; i < size; i++) {
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index 4310e84..c6657df 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -33,12 +33,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.jboss.logging.Logger;
 
 /**
  * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
  */
 public abstract class AbstractSequentialFileFactory implements SequentialFileFactory {
 
+   private static final Logger logger = Logger.getLogger(AbstractSequentialFileFactory.class);
+
    // Timeout used to wait executors to shutdown
    protected static final int EXECUTOR_TIMEOUT = 60;
 
@@ -161,6 +164,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
    public void onIOError(Exception exception, String message, SequentialFile file) {
       if (critialErrorListener != null) {
          critialErrorListener.onIOException(exception, message, file);
+      } else {
+         logger.warn("Critical IO Error Called.  No Critical IO Error Handler Registered");
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index f641aec..fcad101 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -34,9 +34,12 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.jlibaio.LibaioFile;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
 
 public class AIOSequentialFile extends AbstractSequentialFile {
 
+   private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class);
+
    private boolean opened = false;
 
    private LibaioFile aioFile;
@@ -114,6 +117,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public synchronized void fill(final int size) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Filling file: " + getFileName());
+      }
+
       checkOpened();
       aioFile.fill(size);
 
@@ -129,9 +136,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
    public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
       opened = true;
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("Opening file: " + getFileName());
+      }
+
       try {
          aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync());
       } catch (IOException e) {
+         logger.error("Error opening file: " + getFileName());
          factory.onIOError(e, e.getMessage(), this);
          throw new ActiveMQNativeIOError(e.getMessage(), e);
       }
@@ -156,6 +168,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
          // Sending it through the callback would make it released
          aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null));
       } catch (IOException e) {
+         logger.error("IOError reading file: " + getFileName(), e);
          factory.onIOError(e, e.getMessage(), this);
          throw new ActiveMQNativeIOError(e.getMessage(), e);
       }
@@ -176,6 +189,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Write Direct, Sync: " + sync + " File: " + getFileName());
+      }
+
       if (sync) {
          SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index 51d960a..df71c16 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -77,6 +79,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                                    final IOCriticalErrorListener listener) {
       super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener);
       callbackPool = new CallbackCache<>(maxIO);
+      if (logger.isTraceEnabled()) {
+         logger.trace("New AIO File Created");
+      }
    }
 
    public AIOSequentialCallback getCallback() {
@@ -304,7 +309,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
          try {
             libaioFile.write(position, bytes, buffer, this);
          } catch (IOException e) {
-            callback.onError(-1, e.getMessage());
+            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
             onIOError(e, "Failed to write to file", sequentialFile);
          }
       }
@@ -337,6 +342,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
 
       @Override
       public void onError(int errno, String message) {
+         if (logger.isDebugEnabled()) {
+            logger.trace("AIO on error issued. Error(code: " + errno + " msg: " + message + ")");
+         }
          this.error = true;
          this.errorCode = errno;
          this.errorMessage = message;
@@ -357,6 +365,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
 
          if (error) {
             callback.onError(errorCode, errorMessage);
+            onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null);
             errorMessage = null;
          } else {
             if (callback != null) {
@@ -385,6 +394,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
                libaioContext.poll();
             } catch (Throwable e) {
                ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+               onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage(), null);
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 0600687..8cd7fef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.jboss.logging.Logger;
 
@@ -45,14 +46,17 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
    private final Set<FileStore> stores = new HashSet<>();
    private double maxUsage;
    private final Object monitorLock = new Object();
+   private final IOCriticalErrorListener ioCriticalErrorListener;
 
    public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
                            Executor executor,
                            long checkPeriod,
                            TimeUnit timeUnit,
-                           double maxUsage) {
+                           double maxUsage,
+                           IOCriticalErrorListener ioCriticalErrorListener) {
       super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
       this.maxUsage = maxUsage;
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
    }
 
    public FileStoreMonitor addCallback(Callback callback) {
@@ -99,6 +103,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
                if (over) {
                   break;
                }
+            } catch (IOException ioe) {
+               ioCriticalErrorListener.onIOException(ioe, "IO Error while calculating disk usage", null);
             } catch (Exception e) {
                logger.warn(e.getMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 97cb4aa..bb78608 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2127,7 +2127,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       try {
-         injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
+         injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ee4692d5/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index bc4017c..b91d3de 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -96,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
       };
 
       final AtomicBoolean fakeReturn = new AtomicBoolean(false);
-      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) {
+      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) {
          @Override
          protected double calculateUsage(FileStore store) throws IOException {
             if (fakeReturn.get()) {
@@ -127,7 +127,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
    @Test
    public void testScheduler() throws Exception {
 
-      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9);
+      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null);
 
       final ReusableLatch latch = new ReusableLatch(5);
       storeMonitor.addStore(getTestDirfile());


[03/17] activemq-artemis git commit: ARTEMIS-1224 - change the journal file size to nearest multiple

Posted by cl...@apache.org.
ARTEMIS-1224 - change the journal file size to nearest multiple

https://issues.apache.org/jira/browse/ARTEMIS-1224

(cherry picked from commit 30a6ac703efe3539d13152f9311bedd7fd68aa9d)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2c892870
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2c892870
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2c892870

Branch: refs/heads/1.x
Commit: 2c892870092e716fdae0ca4ec8eb6bc1ecea5826
Parents: 31fa758
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Jun 12 16:20:43 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:05 2017 -0400

----------------------------------------------------------------------
 .../core/persistence/impl/journal/JournalStorageManager.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c892870/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index c286e67..e79a9cb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -145,8 +145,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
       int fileSize = config.getJournalFileSize();
       // we need to correct the file size if its not a multiple of the alignement
-      if (fileSize % journalFF.getAlignment() != 0) {
-         int difference = fileSize % journalFF.getAlignment();
+      int modulus = fileSize % journalFF.getAlignment();
+      if (modulus != 0) {
+         int difference = modulus;
          int low = config.getJournalFileSize() - difference;
          int high = low + journalFF.getAlignment();
          fileSize = difference < journalFF.getAlignment() / 2 ? low : high;


[16/17] activemq-artemis git commit: NO-JIRA fixing TimedbufferTest as fixed on master

Posted by cl...@apache.org.
NO-JIRA fixing TimedbufferTest as fixed on master


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e03c41aa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e03c41aa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e03c41aa

Branch: refs/heads/1.x
Commit: e03c41aabd69a0841ec2d4e59b333219b6f504be
Parents: 313e816
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Sep 26 14:43:56 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:43:56 2017 -0400

----------------------------------------------------------------------
 .../artemis/tests/unit/core/journal/impl/TimedBufferTest.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e03c41aa/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index bddb7ea..165fd6e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -204,7 +204,10 @@ public class TimedBufferTest extends ActiveMQTestBase {
          Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
 
          // The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer.
-         Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500);
+         Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 450);
+
+         // ^^ there are some discounts that can happen inside the timed buffer that are still considered valid (like discounting the time it took to perform the operation itself
+         // for that reason the test has been failing (before this commit) at 499 or 480 milliseconds. So, I'm using a reasonable number close to 500 milliseconds that would still be valid for the test
 
          // it should be in fact only writing once..
          // i will set for 3 just in case there's a GC or anything else happening on the test


[07/17] activemq-artemis git commit: ARTEMIS-950 Change log level from INFO to WARN for "Invalid "host" value "0.0.0.0" detected for..." when Artemis is bound to 0.0.0.0

Posted by cl...@apache.org.
ARTEMIS-950 Change log level from INFO to WARN for "Invalid "host" value "0.0.0.0" detected for..." when Artemis is bound to 0.0.0.0

(cherry picked from commit 93ebbfdeaaa3f79c76d6028703c3c7b23bb3783e)
(cherry picked from commit d402f67f4e6c14072213c2a2936edb032cee751b)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/08fdae35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/08fdae35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/08fdae35

Branch: refs/heads/1.x
Commit: 08fdae355c0b66a53d2800c0319957f3920901be
Parents: 7b1a1b0
Author: xstefank <mj...@gmail.com>
Authored: Wed Feb 8 16:41:37 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:06 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/jms/server/ActiveMQJMSServerLogger.java     | 4 ++--
 .../service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08fdae35/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/ActiveMQJMSServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/ActiveMQJMSServerLogger.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/ActiveMQJMSServerLogger.java
index 4a2f701..fdbc514 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/ActiveMQJMSServerLogger.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/ActiveMQJMSServerLogger.java
@@ -52,8 +52,8 @@ public interface ActiveMQJMSServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void serverCachingCommand(Object runnable);
 
-   @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
       format = Message.Format.MESSAGE_FORMAT)
    void invalidHostForConnector(String name, String newHost);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08fdae35/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
----------------------------------------------------------------------
diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
index 02bf839..eb565a5 100644
--- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
+++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXARecoveryLogger.java
@@ -56,8 +56,8 @@ public interface ActiveMQXARecoveryLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void serverCachingCommand(Object runnable);
 
-   @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 122005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
       format = Message.Format.MESSAGE_FORMAT)
    void invalidHostForConnector(String name, String newHost);