You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 16:50:04 UTC
[03/17] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index f374979..5e9a95a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -77,7 +77,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(), conf.isLogJournalWriteRate());
- Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), conf.getJournalPerfBlastPages());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(), conf.isMessageCounterEnabled());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterMaxDayHistory(), conf.getMessageCounterMaxDayHistory());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterSamplePeriod(), conf.getMessageCounterSamplePeriod());
@@ -232,10 +231,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setLogJournalWriteRate(b);
Assert.assertEquals(b, conf.isLogJournalWriteRate());
- i = RandomUtil.randomInt();
- conf.setJournalPerfBlastPages(i);
- Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
l = RandomUtil.randomLong();
conf.setServerDumpInterval(l);
Assert.assertEquals(l, conf.getServerDumpInterval());
@@ -434,10 +429,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setLogJournalWriteRate(b);
Assert.assertEquals(b, conf.isLogJournalWriteRate());
- i = RandomUtil.randomInt();
- conf.setJournalPerfBlastPages(i);
- Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
l = RandomUtil.randomLong();
conf.setServerDumpInterval(l);
Assert.assertEquals(l, conf.getServerDumpInterval());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
index d73accd..1eb749b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.filter.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.tests.util.SilentTestCase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
@@ -35,13 +35,13 @@ public class FilterTest extends SilentTestCase {
private Filter filter;
- private ServerMessage message;
+ private Message message;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- message = new ServerMessageImpl(1, 1000);
+ message = new CoreMessage().initBuffer(1024).setMessageID(1);
}
@Test
@@ -59,7 +59,7 @@ public class FilterTest extends SilentTestCase {
message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
Assert.assertTrue(filter.match(message));
- message = new ServerMessageImpl();
+ message = new CoreMessage();
Assert.assertFalse(filter.match(message));
}
@@ -94,7 +94,7 @@ public class FilterTest extends SilentTestCase {
filter = FilterImpl.createFilter(new SimpleString("AMQDurable='NON_DURABLE'"));
- message = new ServerMessageImpl();
+ message = new CoreMessage();
message.setDurable(true);
Assert.assertFalse(filter.match(message));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 0e9a3f2..2f18c21 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -23,6 +23,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -43,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -329,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
- public ServerMessage handleMessage(ServerMessage message) throws Exception {
+ public ICoreMessage handleMessage(Message message) throws Exception {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 622f042..2bd8cb2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@@ -31,27 +30,26 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUID;
import org.junit.Assert;
import org.junit.Test;
@@ -283,214 +281,164 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
}
- class FakeMessage implements ServerMessage {
-
- final long id;
-
- FakeMessage(final long id) {
- this.id = id;
- }
-
- @Override
- public FakeMessage setMessageID(long id) {
- return this;
- }
-
- @Override
- public long getMessageID() {
- return id;
- }
+ class FakeMessage extends RefCountMessage {
@Override
- public MessageReference createReference(Queue queue) {
+ public RoutingType getRouteType() {
return null;
}
@Override
- public void forceAddress(SimpleString address) {
-
- }
-
- @Override
- public int incrementRefCount() throws Exception {
- return 0;
- }
-
- @Override
- public int decrementRefCount() throws Exception {
- return 0;
+ public SimpleString getReplyTo() {
+ return null;
}
@Override
- public int incrementDurableRefCount() {
- return 0;
+ public Message setReplyTo(SimpleString address) {
+ return null;
}
@Override
- public int decrementDurableRefCount() {
- return 0;
+ public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+ return false;
}
@Override
- public ServerMessage copy(long newID) {
+ public Object removeDeliveryAnnoationProperty(SimpleString key) {
return null;
}
@Override
- public ServerMessage copy() {
+ public Object getDeliveryAnnotationProperty(SimpleString key) {
return null;
}
@Override
- public int getMemoryEstimate() {
- return 0;
- }
+ public void persist(ActiveMQBuffer targetRecord) {
- @Override
- public int getRefCount() {
- return 0;
}
@Override
- public ServerMessage makeCopyForExpiryOrDLA(long newID,
- MessageReference originalReference,
- boolean expiry,
- boolean copyOriginalHeaders) throws Exception {
+ public Long getScheduledDeliveryTime() {
return null;
}
@Override
- public void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry) {
-
- }
-
- @Override
- public void setPagingStore(PagingStore store) {
+ public void reloadPersistence(ActiveMQBuffer record) {
}
@Override
- public PagingStore getPagingStore() {
+ public Persister<Message> getPersister() {
return null;
}
@Override
- public boolean hasInternalProperties() {
- return false;
- }
-
- @Override
- public boolean storeIsPaging() {
- return false;
+ public int getPersistSize() {
+ return 0;
}
+ final long id;
@Override
- public void encodeMessageIDToBuffer() {
-
+ public CoreMessage toCore() {
+ return null;
}
- @Override
- public byte[] getDuplicateIDBytes() {
- return new byte[0];
+ FakeMessage(final long id) {
+ this.id = id;
}
@Override
- public Object getDuplicateProperty() {
- return null;
+ public FakeMessage setMessageID(long id) {
+ return this;
}
@Override
- public void encode(ActiveMQBuffer buffer) {
-
+ public long getMessageID() {
+ return id;
}
@Override
- public void decode(ActiveMQBuffer buffer) {
-
+ public int incrementRefCount() throws Exception {
+ return 0;
}
@Override
- public void decodeFromBuffer(ActiveMQBuffer buffer) {
-
+ public int decrementRefCount() throws Exception {
+ return 0;
}
@Override
- public int getEndOfMessagePosition() {
+ public int incrementDurableRefCount() {
return 0;
}
@Override
- public int getEndOfBodyPosition() {
+ public int decrementDurableRefCount() {
return 0;
}
@Override
- public void bodyChanged() {
-
+ public Message copy(long newID) {
+ return null;
}
@Override
- public boolean isServerMessage() {
- return false;
+ public Message copy() {
+ return null;
}
@Override
- public ActiveMQBuffer getEncodedBuffer() {
- return null;
+ public int getMemoryEstimate() {
+ return 0;
}
@Override
- public int getHeadersAndPropertiesEncodeSize() {
+ public int getRefCount() {
return 0;
}
@Override
- public ActiveMQBuffer getWholeBuffer() {
- return null;
+ public byte[] getDuplicateIDBytes() {
+ return new byte[0];
}
@Override
- public void encodeHeadersAndProperties(ActiveMQBuffer buffer) {
-
+ public Object getDuplicateProperty() {
+ return null;
}
@Override
- public void decodeHeadersAndProperties(ActiveMQBuffer buffer) {
+ public void messageChanged() {
}
@Override
- public BodyEncoder getBodyEncoder() throws ActiveMQException {
+ public UUID getUserID() {
return null;
}
@Override
- public InputStream getBodyInputStream() {
+ public String getAddress() {
return null;
}
@Override
- public void setAddressTransient(SimpleString address) {
-
- }
-
- @Override
- public TypedProperties getTypedProperties() {
+ public SimpleString getAddressSimpleString() {
return null;
}
@Override
- public UUID getUserID() {
+ public Message setBuffer(ByteBuf buffer) {
return null;
}
@Override
- public FakeMessage setUserID(UUID userID) {
- return this;
+ public ByteBuf getBuffer() {
+ return null;
}
-
@Override
- public SimpleString getAddress() {
+ public Message setAddress(String address) {
return null;
}
@@ -500,11 +448,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public byte getType() {
- return 0;
- }
-
- @Override
public boolean isDurable() {
return false;
}
@@ -560,16 +503,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public ActiveMQBuffer getBodyBuffer() {
- return null;
- }
-
- @Override
- public ActiveMQBuffer getBodyBufferDuplicate() {
- return null;
- }
-
- @Override
public Message putBooleanProperty(SimpleString key, boolean value) {
return null;
}
@@ -825,13 +758,23 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public FakeMessage writeBodyBufferBytes(byte[] bytes) {
- return this;
+ public Message setUserID(Object userID) {
+ return null;
}
@Override
- public FakeMessage writeBodyBufferString(String string) {
- return this;
+ public void copyHeadersAndProperties(Message msg) {
+
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+
+ }
+
+ @Override
+ public void sendBuffer(ByteBuf buffer, int count) {
+
}
}
@@ -1221,7 +1164,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public boolean hasMatchingConsumer(ServerMessage message) {
+ public boolean hasMatchingConsumer(Message message) {
return false;
}
@@ -1338,12 +1281,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void route(ServerMessage message, RoutingContext context) throws Exception {
+ public void route(Message message, RoutingContext context) throws Exception {
}
@Override
- public void routeWithAck(ServerMessage message, RoutingContext context) {
+ public void routeWithAck(Message message, RoutingContext context) {
}
@@ -1366,5 +1309,9 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public void decDelivering(int size) {
}
+
+
+
+
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index ee80054..b1ea206 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -26,13 +26,13 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -323,7 +322,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public void storeMessage(ServerMessage message) throws Exception {
+ public void storeMessage(Message message) throws Exception {
}
@@ -368,7 +367,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public void storeMessageTransactional(long txID, ServerMessage message) throws Exception {
+ public void storeMessageTransactional(long txID, Message message) throws Exception {
}
@@ -439,7 +438,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception {
+ public LargeServerMessage createLargeMessage(long id, Message message) throws Exception {
return null;
}
@@ -489,11 +488,6 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception {
-
- }
-
- @Override
public void deletePageTransactional(long recordID) throws Exception {
}
@@ -643,7 +637,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
@Override
public boolean addToPage(PagingStore store,
- ServerMessage msg,
+ Message msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 2f12b05..0bb177d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -94,6 +95,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -117,14 +119,12 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -855,7 +855,7 @@ public abstract class ActiveMQTestBase extends Assert {
return testDir1 + "/journal";
}
- protected String getJournalDir(final int index, final boolean backup) {
+ public String getJournalDir(final int index, final boolean backup) {
return getJournalDir(getTestDir(), index, backup);
}
@@ -2079,8 +2079,8 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
- protected ServerMessage generateMessage(final long id) {
- ServerMessage message = new ServerMessageImpl(id, 1000);
+ protected Message generateMessage(final long id) {
+ ICoreMessage message = new CoreMessage(id, 1000);
message.setMessageID(id);
@@ -2092,9 +2092,9 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected MessageReference generateReference(final Queue queue, final long id) {
- ServerMessage message = generateMessage(id);
+ Message message = generateMessage(id);
- return message.createReference(queue);
+ return MessageReference.Factory.createReference(message, queue);
}
protected int calculateRecordSize(final int size, final int alignment) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 9ed5584..0691e95 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -231,8 +231,6 @@
<journal-compact-percentage>33</journal-compact-percentage>
<journal-compact-min-files>123</journal-compact-min-files>
<journal-max-io>56546</journal-max-io>
- <perf-blast-pages>5</perf-blast-pages>
- <run-sync-speed-test>true</run-sync-speed-test>
<server-dump-interval>5000</server-dump-interval>
<memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/artemis-tools/src/test/resources/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index fcb7a20..090f968 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -673,22 +673,6 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="perf-blast-pages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
- <xsd:annotation>
- <xsd:documentation>
- XXX Only meant to be used by project developers
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
-
- <xsd:element name="run-sync-speed-test" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
- <xsd:annotation>
- <xsd:documentation>
- XXX Only meant to be used by project developers
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
-
<xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
index d7a7bdc..df9d79e 100644
--- a/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
+++ b/examples/features/standard/bridge/src/main/java/org/apache/activemq/artemis/jms/example/HatColourChangeTransformer.java
@@ -16,14 +16,14 @@
*/
package org.apache.activemq.artemis.jms.example;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
public class HatColourChangeTransformer implements Transformer {
@Override
- public ServerMessage transform(final ServerMessage message) {
+ public Message transform(final Message message) {
SimpleString propName = new SimpleString("hat");
SimpleString oldProp = message.getSimpleStringProperty(propName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
index 22272d0..2f75d4c 100644
--- a/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
+++ b/examples/features/standard/divert/src/main/java/org/apache/activemq/artemis/jms/example/AddForwardingTimeTransformer.java
@@ -16,16 +16,13 @@
*/
package org.apache.activemq.artemis.jms.example;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
public class AddForwardingTimeTransformer implements Transformer {
@Override
- public ServerMessage transform(final ServerMessage message) {
- message.putLongProperty(new SimpleString("time_of_forward"), System.currentTimeMillis());
-
+ public Message transform(final Message message) {
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b056ad..e9a5bad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,11 +82,11 @@
<jetty.version>9.4.0.M1</jetty.version>
<jgroups.version>3.6.9.Final</jgroups.version>
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
- <netty.version>4.1.5.Final</netty.version>
- <proton.version>0.16.0</proton.version>
+ <netty.version>4.1.8.Final</netty.version>
+ <proton.version>0.17.0</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
- <qpid.jms.version>0.11.0</qpid.jms.version>
+ <qpid.jms.version>0.20.0</qpid.jms.version>
<johnzon.version>0.9.5</johnzon.version>
<json-p.spec.version>1.0-alpha-1</json-p.spec.version>
<javax.inject.version>1</javax.inject.version>
@@ -1006,6 +1006,7 @@
<arg>-Xep:StaticAccessedFromInstance:ERROR</arg>
<arg>-Xep:SynchronizeOnNonFinalField:ERROR</arg>
<arg>-Xep:WaitNotInLoop:ERROR</arg>
+ <arg>-Xdiags:verbose</arg>
</compilerArgs>
</configuration>
<dependencies>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
index 12f5568..17f601a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
@@ -37,6 +37,11 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator {
}
@Override
+ public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
+ return POOLED.calculateNewCapacity(minNewCapacity, maxCapacity);
+ }
+
+ @Override
public ByteBuf buffer() {
return UNPOOLED.heapBuffer();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
index dea8602..d9bddcb 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
@@ -51,6 +51,12 @@ public class UnmodifiableDelivery implements Delivery {
}
}
+ /* waiting Pull Request sent
+ @Override
+ public int getDataLength() {
+ return delivery.getDataLength();
+ } */
+
@Override
public DeliveryState getLocalState() {
return delivery.getLocalState();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
index 833302d..162a512 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -61,7 +62,7 @@ public class EncodersBench {
this.byteBuffer.order(ByteOrder.nativeOrder());
this.addJournalRecordEncoder = new AddJournalRecordEncoder();
- this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+ this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
this.record.setFileID(1);
this.record.setCompactCount((short) 1);
this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
@@ -86,7 +87,7 @@ public class EncodersBench {
@Benchmark
public int encodeUnalignedWithGarbage() {
outBuffer.clear();
- final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+ final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
addRecord.setFileID(1);
addRecord.setCompactCount((short) 1);
addRecord.encode(outBuffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
index ef71e89..03e2ddc 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/JMSBridgeReconnectionTest.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
@@ -111,7 +111,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase {
static int count = 20;
static CountDownLatch stopLatch = new CountDownLatch(1);
- public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits) {
+ public static void pause2(Message msgI, boolean sendBlocking, final ClientProducerCredits theCredits) {
if (msgI.containsProperty("__AMQ_CID")) {
count--;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
deleted file mode 100644
index 1ff58cd..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageCopyTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class MessageCopyTest {
-
- @Test
- @BMRules(
-
- rules = {@BMRule(
- name = "message-copy0",
- targetClass = "org.apache.activemq.artemis.core.server.impl.ServerMessageImpl",
- targetMethod = "copy()",
- targetLocation = "ENTRY",
- action = "System.out.println(\"copy\"), waitFor(\"encode-done\")"), @BMRule(
- name = "message-copy-done",
- targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage",
- targetMethod = "encode(org.apache.activemq.artemis.spi.core.protocol.RemotingConnection)",
- targetLocation = "EXIT",
- action = "System.out.println(\"encodeDone\"), signalWake(\"encode-done\", true)"), @BMRule(
- name = "message-copy1",
- targetClass = "org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper",
- targetMethod = "copy(int, int)",
- condition = "Thread.currentThread().getName().equals(\"T1\")",
- targetLocation = "EXIT",
- action = "System.out.println(\"setIndex at \" + Thread.currentThread().getName()), waitFor(\"finish-read\")"), @BMRule(
- name = "JMSServer.stop wait-init",
- targetClass = "org.apache.activemq.artemis.tests.extras.byteman.MessageCopyTest",
- targetMethod = "simulateRead",
- targetLocation = "EXIT",
- action = "signalWake(\"finish-read\", true)")})
- public void testMessageCopyIssue() throws Exception {
- final long RUNS = 1;
- final ServerMessageImpl msg = new ServerMessageImpl(123, 18);
-
- msg.setMessageID(RandomUtil.randomLong());
- msg.encodeMessageIDToBuffer();
- msg.setAddress(new SimpleString("Batatantkashf aksjfh aksfjh askfdjh askjfh "));
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- int T1_number = 1;
- int T2_number = 1;
-
- final CountDownLatch latchAlign = new CountDownLatch(T1_number + T2_number);
- final CountDownLatch latchReady = new CountDownLatch(1);
- class T1 extends Thread {
-
- T1() {
- super("T1");
- }
-
- @Override
- public void run() {
- latchAlign.countDown();
- try {
- latchReady.await();
- } catch (Exception ignored) {
- }
-
- for (int i = 0; i < RUNS; i++) {
- try {
- ServerMessageImpl newMsg = (ServerMessageImpl) msg.copy();
- } catch (Throwable e) {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
- }
-
- class T2 extends Thread {
-
- T2() {
- super("T2");
- }
-
- @Override
- public void run() {
- latchAlign.countDown();
- try {
- latchReady.await();
- } catch (Exception ignored) {
- }
-
- for (int i = 0; i < RUNS; i++) {
- try {
- SessionSendMessage ssm = new SessionSendMessage(msg);
- ActiveMQBuffer buf = ssm.encode(null);
- System.out.println("reading at buf = " + buf);
- simulateRead(buf);
- } catch (Throwable e) {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
- }
-
- ArrayList<Thread> threads = new ArrayList<>();
-
- for (int i = 0; i < T1_number; i++) {
- T1 t = new T1();
- threads.add(t);
- t.start();
- }
-
- for (int i = 0; i < T2_number; i++) {
- T2 t2 = new T2();
- threads.add(t2);
- t2.start();
- }
-
- latchAlign.await();
-
- latchReady.countDown();
-
- for (Thread t : threads) {
- t.join();
- }
-
- Assert.assertEquals(0, errors.get());
- }
-
- private void simulateRead(ActiveMQBuffer buf) {
- buf.setIndex(buf.capacity() / 2, buf.capacity() / 2);
-
- // ok this is not actually happening during the read process, but changing this shouldn't affect the buffer on copy
- buf.writeBytes(new byte[1024]);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 8456765..0860e97 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@@ -191,13 +191,13 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
Assert.assertNull(message2);
message = createMessage(session, 3);
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+ message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(1000);
Assert.assertEquals(3, message2.getObjectProperty(propKey));
message = createMessage(session, 4);
- message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+ message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
producer.send(message);
message2 = consumer.receiveImmediate();
Assert.assertNull(message2);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
index bbb9c26..138f3cc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
@@ -24,6 +24,7 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -35,6 +36,7 @@ import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
/**
@@ -119,7 +121,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
// Receive and resend with OpenWire JMS client
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
Connection jmsConnection = factory.createConnection();
try {
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -129,7 +131,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
Message received = jmsConsumer.receive(5000);
assertNotNull(received);
- assertTrue(received instanceof BytesMessage);
+ assertTrue(received instanceof ObjectMessage);
MessageProducer jmsProducer = jmsSession.createProducer(destination);
jmsProducer.send(received);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 0f006bc..70ff658 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -243,27 +243,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
- @Test
- public void testAmbiguousMessageRouting() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
- final String queueD = "queueD";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
-
- sendMessages(addressA, 1);
-
- assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
- }
-
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 7962005..39daee4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,28 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -61,7 +39,24 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -71,14 +66,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -105,6 +99,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
+
@RunWith(Parameterized.class)
public class ProtonTest extends ProtonTestBase {
@@ -224,6 +223,7 @@ public class ProtonTest extends ProtonTestBase {
TextMessage message = session.createTextMessage("test-message");
producer.send(message);
+
producer.close();
connection.start();
@@ -378,7 +378,7 @@ public class ProtonTest extends ProtonTestBase {
receiver.flow(1);
// Shouldn't get this since we delayed the message.
- assertNull(receiver.receive(5, TimeUnit.SECONDS));
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
@@ -827,12 +827,7 @@ public class ProtonTest extends ProtonTestBase {
AmqpReceiver receiver = session.createReceiver(coreAddress);
server.destroyQueue(new SimpleString(coreAddress), null, false, true);
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisfied() throws Exception {
- return receiver.isClosed();
- }
- });
+ Wait.waitFor(receiver::isClosed);
assertTrue(receiver.isClosed());
} finally {
amqpConnection.close();
@@ -851,12 +846,7 @@ public class ProtonTest extends ProtonTestBase {
connection.disconnect(true);
}
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisfied() throws Exception {
- return amqpConnection.isClosed();
- }
- });
+ Wait.waitFor(amqpConnection::isClosed);
assertTrue(amqpConnection.isClosed());
assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
@@ -937,7 +927,7 @@ public class ProtonTest extends ProtonTestBase {
request.setText("[]");
sender.send(request);
- AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS);
+ AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
@@ -1001,12 +991,7 @@ public class ProtonTest extends ProtonTestBase {
final ActiveMQServer remote = createAMQPServer(5673);
remote.start();
try {
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisfied() throws Exception {
- return remote.isActive();
- }
- });
+ Wait.waitFor(remote::isActive);
} catch (Exception e) {
remote.stop();
throw e;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
index beac414..847b69e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AckBatchSizeTest.java
@@ -63,7 +63,8 @@ public class AckBatchSizeTest extends ActiveMQTestBase {
ActiveMQServer server = createServer(false);
server.start();
int numMessages = 100;
- ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * getMessageEncodeSize(addressA)).setBlockOnAcknowledge(true);
+ int originalSize = getMessageEncodeSize(addressA);
+ ServerLocator locator = createInVMNonHALocator().setAckBatchSize(numMessages * originalSize).setBlockOnAcknowledge(true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
@@ -71,20 +72,25 @@ public class AckBatchSizeTest extends ActiveMQTestBase {
session.createQueue(addressA, queueA, false);
ClientProducer cp = sendSession.createProducer(addressA);
for (int i = 0; i < numMessages; i++) {
- cp.send(sendSession.createMessage(false));
+ ClientMessage message = (ClientMessage)sendSession.createMessage(false).setAddress(addressA);
+ Assert.assertEquals(originalSize, message.getEncodeSize());
+ cp.send(message);
+ Assert.assertEquals(originalSize, message.getEncodeSize());
}
ClientConsumer consumer = session.createConsumer(queueA);
session.start();
for (int i = 0; i < numMessages - 1; i++) {
+ System.out.println("Receive ");
ClientMessage m = consumer.receive(5000);
-
+ Assert.assertEquals(0, m.getPropertyNames().size());
+ Assert.assertEquals("expected to be " + originalSize, originalSize, m.getEncodeSize());
m.acknowledge();
}
ClientMessage m = consumer.receive(5000);
Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable();
- Assert.assertEquals(100, q.getDeliveringCount());
+ Assert.assertEquals(numMessages, q.getDeliveringCount());
m.acknowledge();
Assert.assertEquals(0, q.getDeliveringCount());
sendSession.close();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 0597dd5..042effd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -21,10 +21,14 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -34,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
@@ -332,10 +337,138 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
}
- class FakeMessageWithID implements Message {
+ class FakeMessageWithID extends RefCountMessage {
final long id;
+ @Override
+ public RoutingType getRouteType() {
+ return null;
+ }
+
+ @Override
+ public SimpleString getReplyTo() {
+ return null;
+ }
+
+ @Override
+ public Message setReplyTo(SimpleString address) {
+ return null;
+ }
+
+ @Override
+ public boolean containsDeliveryAnnotationProperty(SimpleString property) {
+ return false;
+ }
+
+ @Override
+ public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Object getDeliveryAnnotationProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public int getPersistSize() {
+ return 0;
+ }
+
+ @Override
+ public void persist(ActiveMQBuffer targetRecord) {
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return null;
+ }
+
+ @Override
+ public void reloadPersistence(ActiveMQBuffer record) {
+
+ }
+
+ @Override
+ public Long getScheduledDeliveryTime() {
+ return null;
+ }
+
+ @Override
+ public ICoreMessage toCore() {
+ return null;
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+
+ }
+
+ @Override
+ public void sendBuffer(ByteBuf buffer, int count) {
+
+ }
+ @Override
+ public Message setUserID(Object userID) {
+ return null;
+ }
+
+ @Override
+ public void copyHeadersAndProperties(Message msg) {
+
+ }
+
+ @Override
+ public void messageChanged() {
+
+ }
+
+ @Override
+ public Message copy() {
+ return null;
+ }
+
+ @Override
+ public Message copy(long newID) {
+ return null;
+ }
+
+ @Override
+ public Message setMessageID(long id) {
+ return null;
+ }
+
+ @Override
+ public int getRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int incrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int decrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int incrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int decrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int getMemoryEstimate() {
+ return 0;
+ }
+
FakeMessageWithID(final long id) {
this.id = id;
}
@@ -351,23 +484,33 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public FakeMessageWithID setUserID(UUID userID) {
- return this;
+ public String getAddress() {
+ return null;
}
@Override
- public SimpleString getAddress() {
+ public SimpleString getAddressSimpleString() {
return null;
}
@Override
- public Message setAddress(SimpleString address) {
+ public Message setBuffer(ByteBuf buffer) {
return null;
}
@Override
- public byte getType() {
- return 0;
+ public ByteBuf getBuffer() {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(String address) {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(SimpleString address) {
+ return null;
}
@Override
@@ -426,16 +569,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public ActiveMQBuffer getBodyBuffer() {
- return null;
- }
-
- @Override
- public ActiveMQBuffer getBodyBufferDuplicate() {
- return null;
- }
-
- @Override
public Message putBooleanProperty(SimpleString key, boolean value) {
return null;
}
@@ -689,15 +822,5 @@ public class AcknowledgeTest extends ActiveMQTestBase {
public Map<String, Object> toPropertyMap() {
return null;
}
-
- @Override
- public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) {
- return this;
- }
-
- @Override
- public FakeMessageWithID writeBodyBufferString(String string) {
- return this;
- }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 8f00b2a..b957291 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -16,6 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
@@ -27,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -41,10 +54,13 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -54,15 +70,17 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class ConsumerTest extends ActiveMQTestBase {
- @Parameterized.Parameters(name = "isNetty={0}")
+ @Parameterized.Parameters(name = "isNetty={0}, persistent={1}")
public static Collection getParameters() {
- return Arrays.asList(new Object[][]{{true}, {false}});
+ return Arrays.asList(new Object[][]{{true, true}, {false, false}, {false, true}, {true, false}});
}
- public ConsumerTest(boolean netty) {
+ public ConsumerTest(boolean netty, boolean durable) {
this.netty = netty;
+ this.durable = durable;
}
+ private final boolean durable;
private final boolean netty;
private ActiveMQServer server;
@@ -79,13 +97,31 @@ public class ConsumerTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
- server = createServer(false, isNetty());
+ server = createServer(durable, isNetty());
server.start();
locator = createFactory(isNetty());
}
+ @Before
+ public void createQueue() throws Exception {
+
+ ServerLocator locator = createFactory(isNetty());
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+ }
+
@Test
public void testStressConnection() throws Exception {
@@ -113,34 +149,220 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true, false);
- session.createQueue(QUEUE, QUEUE, null, false);
-
- ClientConsumer consumer = session.createConsumer(QUEUE);
-
ClientProducer producer = session.createProducer(QUEUE);
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
message.getBodyBuffer().writeString("hi");
message.putStringProperty("hello", "elo");
producer.send(message);
+ session.commit();
+
+ session.close();
+ if (durable) {
+ server.stop();
+ server.start();
+ }
+ sf = createSessionFactory(locator);
+ session = sf.createSession(false, true, true, false);
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
session.start();
if (cancelOnce) {
- final ClientConsumerInternal consumerInternal = (ClientConsumerInternal)consumer;
+ final ClientConsumerInternal consumerInternal = (ClientConsumerInternal) consumer;
Wait.waitFor(() -> consumerInternal.getBufferSize() > 0);
consumer.close();
consumer = session.createConsumer(QUEUE);
}
ClientMessage message2 = consumer.receive(1000);
+ Assert.assertNotNull(message2);
+
System.out.println("Id::" + message2.getMessageID());
System.out.println("Received " + message2);
+ System.out.println("Clie:" + ByteUtil.bytesToHex(message2.getBuffer().array(), 4));
+
+ System.out.println("String::" + message2.getReadOnlyBodyBuffer().readString());
+
+ Assert.assertEquals("elo", message2.getStringProperty("hello"));
+
+ Assert.assertEquals("hi", message2.getReadOnlyBodyBuffer().readString());
+
session.close();
}
+ @Test
+ public void testSendReceiveAMQP() throws Throwable {
+
+ if (!isNetty()) {
+ // no need to run the test, there's no AMQP support
+ return;
+ }
+
+ internalSend(true, true);
+ }
+
+ @Test
+ public void testSendReceiveCore() throws Throwable {
+
+ if (!isNetty()) {
+ // no need to run the test, there's no AMQP support
+ return;
+ }
+
+ internalSend(false, false);
+ }
+
+ @Test
+ public void testSendAMQPReceiveCore() throws Throwable {
+
+ if (!isNetty()) {
+ // no need to run the test, there's no AMQP support
+ return;
+ }
+
+ internalSend(true, false);
+ }
+
+ @Test
+ public void testSendCoreReceiveAMQP() throws Throwable {
+
+ if (!isNetty()) {
+ // no need to run the test, there's no AMQP support
+ return;
+ }
+
+ internalSend(false, true);
+ }
+
+
+
+ public static class MyTest implements Serializable {
+ int i;
+
+ public int getI() {
+ return i;
+ }
+
+ public MyTest setI(int i) {
+ this.i = i;
+ return this;
+ }
+ }
+
+
+ public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable {
+
+ ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616");
+ ConnectionFactory factoryCore = new ActiveMQConnectionFactory();
+
+
+ Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection();
+
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session.createQueue(QUEUE.toString());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ long time = System.currentTimeMillis();
+ int NUMBER_OF_MESSAGES = 100;
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage msg = session.createTextMessage("hello " + i);
+ msg.setIntProperty("mycount", i);
+ producer.send(msg);
+
+ ObjectMessage objectMessage = session.createObjectMessage(new MyTest().setI(i));
+ producer.send(objectMessage);
+
+ MapMessage mapMessage = session.createMapMessage();
+ mapMessage.setInt("intOne", i);
+ mapMessage.setString("stringOne", Integer.toString(i));
+ producer.send(mapMessage);
+
+ StreamMessage stream = session.createStreamMessage();
+ stream.writeBoolean(true);
+ stream.writeInt(i);
+ producer.send(stream);
+
+ BytesMessage bytes = session.createBytesMessage();
+ bytes.writeUTF("string " + i);
+ producer.send(bytes);
+ }
+ long end = System.currentTimeMillis();
+
+ System.out.println("Time = " + (end - time));
+
+ {
+ TextMessage dummyMessage = session.createTextMessage();
+ dummyMessage.setJMSType("car");
+ dummyMessage.setStringProperty("color", "red");
+ dummyMessage.setLongProperty("weight", 3000);
+ dummyMessage.setText("testSelectorExampleFromSpecs:1");
+ producer.send(dummyMessage);
+
+ dummyMessage = session.createTextMessage();
+ dummyMessage.setJMSType("car");
+ dummyMessage.setStringProperty("color", "blue");
+ dummyMessage.setLongProperty("weight", 3000);
+ dummyMessage.setText("testSelectorExampleFromSpecs:2");
+ producer.send(dummyMessage);
+ }
+
+
+
+
+ connection.close();
+
+ if (this.durable) {
+ server.stop();
+ server.start();
+ }
+
+ connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(QUEUE.toString());
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(i, message.getIntProperty("mycount"));
+ Assert.assertEquals("hello " + i, message.getText());
+
+ ObjectMessage objectMessage = (ObjectMessage)consumer.receive(5000);
+ Assert.assertNotNull(objectMessage);
+ Assert.assertEquals(i, ((MyTest)objectMessage.getObject()).getI());
+
+ MapMessage mapMessage = (MapMessage) consumer.receive(1000);
+ Assert.assertNotNull(mapMessage);
+ Assert.assertEquals(i, mapMessage.getInt("intOne"));
+ Assert.assertEquals(Integer.toString(i), mapMessage.getString("stringOne"));
+
+ StreamMessage stream = (StreamMessage)consumer.receive(5000);
+ Assert.assertTrue(stream.readBoolean());
+ Assert.assertEquals(i, stream.readInt());
+
+ BytesMessage bytes = (BytesMessage) consumer.receive(5000);
+ Assert.assertEquals("string " + i, bytes.readUTF());
+ }
+
+ consumer.close();
+
+ consumer = session.createConsumer(queue, "JMSType = 'car' AND color = 'blue' AND weight > 2500");
+
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText());
+ } finally {
+ connection.close();
+ }
+ }
@Test
public void testConsumerAckImmediateAutoCommitTrue() throws Exception {
@@ -148,8 +370,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
@@ -180,8 +400,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, false, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
@@ -212,8 +430,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
@@ -247,8 +463,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
@@ -284,11 +498,9 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 10000;
+ final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
ClientMessage message = createTextMessage(session, "m" + i);
@@ -338,8 +550,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
session.start();
ClientProducer producer = session.createProducer(QUEUE);
@@ -372,8 +582,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true);
session.start();
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientConsumer consumer = session.createConsumer(QUEUE);
consumer.setMessageHandler(new MessageHandler() {
@@ -394,8 +602,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createSession(false, true, true);
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientConsumer consumer = session.createConsumer(QUEUE);
consumer.setMessageHandler(new MessageHandler() {
@@ -436,7 +642,7 @@ public class ConsumerTest extends ActiveMQTestBase {
sessions.add(session);
- session.createQueue(QUEUE, QUEUE.concat("" + i), null, false);
+ session.createQueue(QUEUE, QUEUE.concat("" + i), null, true);
if (i == 0) {
session.createQueue(QUEUE_RESPONSE, QUEUE_RESPONSE);
@@ -550,8 +756,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createTransactedSession();
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
@@ -598,7 +802,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ServerLocator locator = addServerLocator(ServerLocatorImpl.newLocator("vm:/1"));
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
- session.createQueue(QUEUE, QUEUE);
ClientProducer producer = session.createProducer(QUEUE);
producer.send(session.createMessage(true));
@@ -620,8 +823,6 @@ public class ConsumerTest extends ActiveMQTestBase {
ClientSession session = sf.createTransactedSession();
- session.createQueue(QUEUE, QUEUE, null, false);
-
ClientProducer producer = session.createProducer(QUEUE);
final int numMessages = 100;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4894015d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 81e0ca4..201a96b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -56,7 +58,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -519,7 +520,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
@@ -541,7 +542,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
*/
@Override
public int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {