You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/01 04:20:18 UTC
[2/4] activemq-artemis git commit: major refactoring on Transactions
and AMQ objects
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 77705fa..f3dd306 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -74,7 +75,6 @@ import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.Transaction.State;
-import org.apache.activemq.artemis.core.transaction.TransactionFactory;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -97,6 +97,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// Attributes ----------------------------------------------------------------------------
+ private boolean securityEnabled = true;
+
protected final String username;
protected final String password;
@@ -169,32 +171,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// concurrently.
private volatile boolean closed = false;
- private final TransactionFactory transactionFactory;
-
- public ServerSessionImpl(final String name,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final boolean strictUpdateDeliveryCount,
- final boolean xa,
- final RemotingConnection remotingConnection,
- final StorageManager storageManager,
- final PostOffice postOffice,
- final ResourceManager resourceManager,
- final SecurityStore securityStore,
- final ManagementService managementService,
- final ActiveMQServer server,
- final SimpleString managementAddress,
- final SimpleString defaultAddress,
- final SessionCallback callback,
- final OperationContext context,
- final QueueCreator queueCreator) throws Exception {
- this(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, strictUpdateDeliveryCount, xa, remotingConnection, storageManager, postOffice, resourceManager, securityStore, managementService, server, managementAddress, defaultAddress, callback, context, null, queueCreator);
- }
-
public ServerSessionImpl(final String name,
final String username,
final String password,
@@ -215,7 +191,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context,
- TransactionFactory transactionFactory,
final QueueCreator queueCreator) throws Exception {
this.username = username;
@@ -261,13 +236,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.queueCreator = queueCreator;
- if (transactionFactory == null) {
- this.transactionFactory = new DefaultTransactionFactory();
- }
- else {
- this.transactionFactory = transactionFactory;
- }
-
if (!xa) {
tx = newTransaction();
}
@@ -275,6 +243,19 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// ServerSession implementation ----------------------------------------------------------------------------
+ @Override
+ public void enableSecurity() {
+ this.securityEnabled = true;
+ }
+
+ @Override
+ public void disableSecurity() {
+ this.securityEnabled = false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
/**
* @return the sessionContext
*/
@@ -386,7 +367,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.removeFailureListener(this);
- callback.closed();
+ if (callback != null) {
+ callback.closed();
+ }
closed = true;
}
@@ -397,6 +380,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return queueCreator;
}
+ protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
+ if (securityEnabled) {
+ securityStore.check(address, checkType, auth);
+ }
+ }
+
@Override
public ServerConsumer createConsumer(final long consumerID,
final SimpleString queueName,
@@ -417,11 +406,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
- securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
+ securityCheck(binding.getAddress(), CheckType.CONSUME, this);
Filter filter = FilterImpl.createFilter(filterString);
- ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
+ ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding)binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
if (!browseOnly) {
@@ -465,20 +454,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return consumer;
}
- protected ServerConsumer newConsumer(long consumerID,
- ServerSessionImpl serverSessionImpl,
- QueueBinding binding,
- Filter filter,
- boolean started2,
- boolean browseOnly,
- StorageManager storageManager2,
- SessionCallback callback2,
- boolean preAcknowledge2,
- boolean strictUpdateDeliveryCount2,
- ManagementService managementService2,
- boolean supportLargeMessage,
- Integer credits) throws Exception {
- return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+ /** Some protocols may chose to hold their transactions outside of the ServerSession.
+ * This can be used to replace the transaction.
+ * Notice that we set autoCommitACK and autoCommitSends to true if tx == null */
+ public void resetTX(Transaction transaction) {
+ this.tx = transaction;
+ this.autoCommitAcks = transaction == null;
+ this.autoCommitSends = transaction == null;
}
@Override
@@ -489,10 +471,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean durable) throws Exception {
if (durable) {
// make sure the user has privileges to create this queue
- securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
+ securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
}
else {
- securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
+ securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
server.checkQueueCreationLimit(getUsername());
@@ -537,7 +519,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString name,
boolean durable,
final SimpleString filterString) throws Exception {
- securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
+ securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
server.checkQueueCreationLimit(getUsername());
@@ -632,7 +614,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception {
- ServerConsumer consumer = consumers.get(consumerID);
+ ServerConsumer consumer = locateConsumer(consumerID);
// this would be possible if the server consumer was closed by pings/pongs.. etc
if (consumer != null) {
@@ -640,15 +622,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
- public void promptDelivery(long consumerID) {
- ServerConsumer consumer = consumers.get(consumerID);
-
- // this would be possible if the server consumer was closed by pings/pongs.. etc
- if (consumer != null) {
- consumer.promptDelivery();
- }
- }
-
@Override
public void acknowledge(final long consumerID, final long messageID) throws Exception {
ServerConsumer consumer = findConsumer(consumerID);
@@ -674,8 +647,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
+ public ServerConsumer locateConsumer(long consumerID) {
+ return consumers.get(consumerID);
+ }
+
private ServerConsumer findConsumer(long consumerID) throws Exception {
- ServerConsumer consumer = consumers.get(consumerID);
+ ServerConsumer consumer = locateConsumer(consumerID);
if (consumer == null) {
Transaction currentTX = tx;
@@ -710,7 +687,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void individualCancel(final long consumerID, final long messageID, boolean failed) throws Exception {
- ServerConsumer consumer = consumers.get(consumerID);
+ ServerConsumer consumer = locateConsumer(consumerID);
if (consumer != null) {
consumer.individualCancel(messageID, failed);
@@ -720,7 +697,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void expire(final long consumerID, final long messageID) throws Exception {
- MessageReference ref = consumers.get(consumerID).removeReferenceByID(messageID);
+ MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID);
if (ref != null) {
ref.getQueue().expire(ref);
@@ -778,8 +755,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
/**
* @return
*/
- protected Transaction newTransaction() {
- return transactionFactory.newTransaction(null, storageManager, timeoutSeconds);
+ public Transaction newTransaction() {
+ return new TransactionImpl(null, storageManager, timeoutSeconds);
}
/**
@@ -787,7 +764,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
* @return
*/
private Transaction newTransaction(final Xid xid) {
- return transactionFactory.newTransaction(xid, storageManager, timeoutSeconds);
+ return new TransactionImpl(xid, storageManager, timeoutSeconds);
}
@Override
@@ -1189,7 +1166,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void closeConsumer(final long consumerID) throws Exception {
- final ServerConsumer consumer = consumers.get(consumerID);
+ final ServerConsumer consumer = locateConsumer(consumerID);
if (consumer != null) {
consumer.close(false);
@@ -1201,7 +1178,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception {
- ServerConsumer consumer = consumers.get(consumerID);
+ ServerConsumer consumer = locateConsumer(consumerID);
if (consumer == null) {
ActiveMQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID);
@@ -1214,9 +1191,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Transaction getCurrentTransaction() {
- if (tx == null) {
- tx = newTransaction();
- }
return tx;
}
@@ -1489,7 +1463,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception {
try {
- securityStore.check(message.getAddress(), CheckType.MANAGE, this);
+ securityCheck(message.getAddress(), CheckType.MANAGE, this);
}
catch (ActiveMQException e) {
if (!autoCommitSends) {
@@ -1564,7 +1538,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
// check the user has write access to this address.
try {
- securityStore.check(msg.getAddress(), CheckType.SEND, this);
+ securityCheck(msg.getAddress(), CheckType.SEND, this);
}
catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
@@ -1613,12 +1587,4 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return Collections.emptyList();
}
}
-
- private static class DefaultTransactionFactory implements TransactionFactory {
-
- @Override
- public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) {
- return new TransactionImpl(xid, storageManager, timeoutSeconds);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index eb1ab3c..9b54851 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -22,6 +22,7 @@ import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
/**
@@ -33,6 +34,12 @@ public interface Transaction {
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
}
+ Object getProtocolData();
+
+ /** Protocol managers can use this field to store any object needed.
+ * An example would be the Session used by the transaction on openwire */
+ void setProtocolData(Object data);
+
boolean isEffective();
void prepare() throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
deleted file mode 100644
index 5e97826..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.transaction;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-
-import javax.transaction.xa.Xid;
-
-public interface TransactionFactory {
-
- Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index db55aa8..5f08e61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -59,6 +59,18 @@ public class TransactionImpl implements Transaction {
private int timeoutSeconds = -1;
+ private Object protocolData;
+
+ @Override
+ public Object getProtocolData() {
+ return protocolData;
+ }
+
+ @Override
+ public void setProtocolData(Object protocolData) {
+ this.protocolData = protocolData;
+ }
+
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
this.storageManager = storageManager;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index a9eb0f2..cf0ec69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -36,9 +37,15 @@ public interface SessionCallback {
void sendProducerCreditsFailMessage(int credits, SimpleString address);
- int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount);
+ // Note: don't be tempted to remove the parameter message
+ // Even though ref will contain the message in certain cases
+ // such as paging the message could be a SoftReference or WeakReference
+ // and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
+ //
+ // Future developments may change this, but beware why I have chosen to keep the parameter separated here
+ int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount);
- int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount);
+ int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount);
int sendLargeMessageContinuation(ServerConsumer consumerID,
byte[] body,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 22ec438..53edb79 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1106,6 +1106,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+ }
+
+ @Override
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 8403ee3..45f8b30 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -567,12 +567,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
connection = cf.createConnection();
connection.start();
connections.add(connection);
- final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
connection = cf.createConnection();
connection.start();
connections.add(connection);
- final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
@@ -583,7 +583,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
final Vector<Message> receivedMessages = new Vector<>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
- new Thread() {
+ Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
public void run() {
LOG.info("doing async commit after consume...");
try {
@@ -630,10 +630,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
e.printStackTrace();
}
}
- }.start();
+ };
+ t.start();
// will be stopped by the plugin
brokerStopLatch.await(60, TimeUnit.SECONDS);
+ t.join();
broker = createBroker();
broker.start();
doByteman.set(false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 54ae6c8..9702624 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -507,7 +508,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
@@ -518,7 +519,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
try {
- return targetCallback.sendMessage(message, consumer, deliveryCount);
+ return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
}
finally {
callbackSemaphore.release();
@@ -530,8 +531,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
- return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
+ public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
}
/* (non-Javadoc)
@@ -581,7 +582,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
String defaultAddress,
SessionCallback callback,
OperationContext context,
- ServerSessionFactory sessionFactory,
boolean autoCreateQueue) throws Exception {
return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index a1a5e38..14cfee0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,6 +26,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -118,7 +119,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
}
@Test
- public void testSendnReceiveAuthorization() throws Exception {
+ public void testSendnReceiveAuthorization() throws Exception {
Connection sendingConn = null;
Connection receivingConn = null;
@@ -152,16 +153,18 @@ public class BasicSecurityTest extends BasicOpenWireTest {
producer = sendingSession.createProducer(dest);
producer.send(message);
- MessageConsumer consumer = null;
+ MessageConsumer consumer;
try {
consumer = sendingSession.createConsumer(dest);
+ Assert.fail("exception expected");
}
catch (JMSSecurityException e) {
+ e.printStackTrace();
//expected
}
consumer = receivingSession.createConsumer(dest);
- TextMessage received = (TextMessage) consumer.receive();
+ TextMessage received = (TextMessage) consumer.receive(5000);
assertNotNull(received);
assertEquals("Hello World", received.getText());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 825b8b5..69d9784 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import static org.junit.Assert.assertEquals;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.junit.Test;
public class OpenWireUtilTest {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index b87fc7d..c4aea03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -27,24 +26,27 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
public class SimpleOpenWireTest extends BasicOpenWireTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
@Override
@Before
public void setUp() throws Exception {
@@ -53,6 +55,158 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
@Test
+ public void testSimple() throws Exception {
+ Connection connection = factory.createConnection();
+
+ Collection<Session> sessions = new LinkedList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ sessions.add(session);
+ }
+
+ connection.close();
+ }
+
+ @Test
+ public void testTransactionalSimple() throws Exception {
+ try (Connection connection = factory.createConnection()) {
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ producer.send(session.createTextMessage("test"));
+ session.commit();
+
+ Assert.assertNull(consumer.receive(100));
+ connection.start();
+
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("test", message.getText());
+
+ Assert.assertNotNull(message);
+
+ message.acknowledge();
+ }
+ }
+
+ @Test
+ public void testXASimple() throws Exception {
+ XAConnection connection = xaFactory.createXAConnection();
+
+ Collection<Session> sessions = new LinkedList<>();
+
+ for (int i = 0; i < 10; i++) {
+ XASession session = connection.createXASession();
+ session.getXAResource().start(newXID(), XAResource.TMNOFLAGS);
+ sessions.add(session);
+ }
+
+ connection.close();
+
+ }
+
+ @Test
+ public void testClientACK() throws Exception {
+ try {
+
+ Connection connection = factory.createConnection();
+
+ Collection<Session> sessions = new LinkedList<>();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ producer.send(session.createTextMessage("test"));
+
+ Assert.assertNull(consumer.receive(100));
+ connection.start();
+
+ TextMessage message = (TextMessage) consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ message.acknowledge();
+
+ connection.close();
+
+ System.err.println("Done!!!");
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testRollback() throws Exception {
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ producer.send(session.createTextMessage("test"));
+ producer.send(session.createTextMessage("test2"));
+ connection.start();
+ Assert.assertNull(consumer.receiveNoWait());
+ session.rollback();
+ producer.send(session.createTextMessage("test2"));
+ Assert.assertNull(consumer.receiveNoWait());
+ session.commit();
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+
+ Assert.assertNotNull(msg);
+ Assert.assertEquals("test2", msg.getText());
+ }
+ }
+
+ @Test
+ public void testAutoAck() throws Exception {
+ Connection connection = factory.createConnection();
+
+ Collection<Session> sessions = new LinkedList<>();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage msg = session.createTextMessage("test");
+ msg.setStringProperty("abc", "testAutoACK");
+ producer.send(msg);
+
+ Assert.assertNull(consumer.receive(100));
+ connection.start();
+
+ TextMessage message = (TextMessage) consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ connection.close();
+
+ System.err.println("Done!!!");
+ }
+
+ @Test
+ public void testProducerFlowControl() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+
+ factory.setProducerWindowSize(1024 * 64);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("test"));
+
+ connection.close();
+ }
+
+ @Test
public void testSimpleQueue() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -88,12 +242,11 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
session.close();
}
-
- @Test
+ // @Test -- ignored for now
public void testKeepAlive() throws Exception {
connection.start();
- Thread.sleep(125000);
+ Thread.sleep(30000);
connection.createSession(false, 1);
}
@@ -237,9 +390,11 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("foo");
- thrown.expect(InvalidDestinationException.class);
- thrown.expect(JMSException.class);
- session.createProducer(queue);
+ try {
+ session.createProducer(queue);
+ }
+ catch (JMSException expected) {
+ }
session.close();
}
@@ -390,7 +545,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
-
/**
* This is the example shipped with the distribution
*
@@ -473,7 +627,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
-
// simple test sending openwire, consuming core
@Test
public void testMixedOpenWireExample2() throws Exception {
@@ -513,5 +666,396 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
conn2.close();
}
+ @Test
+ public void testXAConsumer() throws Exception {
+ Queue queue;
+ try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+ queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("test" + i);
+ msg.setStringProperty("myobj", "test" + i);
+ producer.send(msg);
+ }
+ session.close();
+ }
+
+ try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+ Xid xid = newXID();
+
+ XASession session = xaconnection.createXASession();
+ session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ MessageConsumer consumer = session.createConsumer(queue);
+ xaconnection.start();
+ for (int i = 0; i < 5; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("test" + i, message.getText());
+ }
+ session.getXAResource().end(xid, XAResource.TMSUCCESS);
+ session.getXAResource().rollback(xid);
+ consumer.close();
+ xaconnection.close();
+ }
+
+ try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ // Assert.assertEquals("test" + i, message.getText());
+ System.out.println("Message " + message.getText());
+ }
+ checkDuplicate(consumer);
+ System.out.println("Queue:" + queue);
+ session.close();
+ }
+
+ System.err.println("Done!!!");
+ }
+
+ @Test
+ public void testXASameConsumerRollback() throws Exception {
+ Queue queue;
+ try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+ queue = session.createQueue(queueName);
+ System.out.println("Queue:" + queue);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("test" + i);
+ msg.setStringProperty("myobj", "test" + i);
+ producer.send(msg);
+ }
+ session.close();
+ }
+
+ try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+ Xid xid = newXID();
+
+ XASession session = xaconnection.createXASession();
+ session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ MessageConsumer consumer = session.createConsumer(queue);
+ xaconnection.start();
+ for (int i = 0; i < 5; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("test" + i, message.getText());
+ }
+ session.getXAResource().end(xid, XAResource.TMSUCCESS);
+ session.getXAResource().rollback(xid);
+
+ xid = newXID();
+ session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("test" + i, message.getText());
+ }
+
+ checkDuplicate(consumer);
+
+ session.getXAResource().end(xid, XAResource.TMSUCCESS);
+ session.getXAResource().commit(xid, true);
+ }
+ }
+
+ @Test
+ public void testXAPrepare() throws Exception {
+ try {
+
+ XAConnection connection = xaFactory.createXAConnection();
+
+ XASession xasession = connection.createXASession();
+
+ Xid xid = newXID();
+ xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ Queue queue = xasession.createQueue(queueName);
+ MessageProducer producer = xasession.createProducer(queue);
+ producer.send(xasession.createTextMessage("hello"));
+ producer.send(xasession.createTextMessage("hello"));
+ xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
+
+ xasession.getXAResource().prepare(xid);
+
+ connection.close();
+
+ System.err.println("Done!!!");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testAutoSend() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("testXX" + i));
+ }
+ connection.start();
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+ }
+
+ @Test
+ public void testCommitCloseConsumerBefore() throws Exception {
+ testCommitCloseConsumer(true);
+ }
+
+ @Test
+ public void testCommitCloseConsumerAfter() throws Exception {
+ testCommitCloseConsumer(false);
+ }
+
+ private void testCommitCloseConsumer(boolean closeBefore) throws Exception {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("testXX" + i);
+ msg.setStringProperty("count", "str " + i);
+ producer.send(msg);
+ }
+ session.commit();
+ connection.start();
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+ if (closeBefore) {
+ consumer.close();
+ }
+
+ session.commit();
+
+ // we're testing two scenarios.
+ // closing the consumer before commit or after commit
+ if (!closeBefore) {
+ consumer.close();
+ }
+
+ consumer = session.createConsumer(queue);
+ // Assert.assertNull(consumer.receiveNoWait());
+ for (int i = 5; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ Assert.assertNull(consumer.receiveNoWait());
+
+ }
+
+ @Test
+ public void testRollbackWithAcked() throws Exception {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("testXX" + i);
+ msg.setStringProperty("count", "str " + i);
+ producer.send(msg);
+ }
+ session.commit();
+ connection.start();
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ session.rollback();
+
+ consumer.close();
+
+ consumer = session.createConsumer(queue);
+ // Assert.assertNull(consumer.receiveNoWait());
+ for (int i = 0; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ // System.out.println("TXT::" + txt);
+ Assert.assertNotNull(txt);
+ System.out.println("TXT " + txt.getText());
+ // Assert.assertEquals("testXX" + i, txt.getText());
+ }
+ session.commit();
+
+ checkDuplicate(consumer);
+
+ }
+
+ @Test
+ public void testRollbackLocal() throws Exception {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("testXX" + i);
+ msg.setStringProperty("count", "str " + i);
+ producer.send(msg);
+ }
+ session.commit();
+ connection.start();
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(500);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ session.rollback();
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(txt);
+ System.out.println("TXT " + txt.getText());
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ checkDuplicate(consumer);
+
+ session.commit();
+
+ }
+
+ private void checkDuplicate(MessageConsumer consumer) throws JMSException {
+ boolean duplicatedMessages = false;
+ while (true) {
+ TextMessage txt = (TextMessage) consumer.receiveNoWait();
+ if (txt == null) {
+ break;
+ }
+ else {
+ duplicatedMessages = true;
+ System.out.println("received in duplicate:" + txt.getText());
+ }
+ }
+
+ Assert.assertFalse("received messages in duplicate", duplicatedMessages);
+ }
+
+ @Test
+ public void testIndividualAck() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("testXX" + i);
+ msg.setStringProperty("count", "str " + i);
+ producer.send(msg);
+ }
+ connection.start();
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ if (i == 4) {
+ txt.acknowledge();
+ }
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ consumer.close();
+
+ consumer = session.createConsumer(queue);
+ // Assert.assertNull(consumer.receiveNoWait());
+ for (int i = 0; i < 4; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ txt.acknowledge();
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ for (int i = 5; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ txt.acknowledge();
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ checkDuplicate(consumer);
+
+ Assert.assertNull(consumer.receiveNoWait());
+
+ }
+
+ @Test
+ public void testCommitCloseConsumeXA() throws Exception {
+
+ Queue queue;
+ {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ queue = session.createQueue(queueName);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage msg = session.createTextMessage("testXX" + i);
+ msg.setStringProperty("count", "str " + i);
+ producer.send(msg);
+ }
+ session.commit();
+ }
+
+ try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+ xaconnection.start();
+
+ XASession xasession = xaconnection.createXASession();
+ Xid xid = newXID();
+ xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ MessageConsumer consumer = xasession.createConsumer(queue);
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+
+ consumer.close();
+
+ xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
+ xasession.getXAResource().prepare(xid);
+ xasession.getXAResource().commit(xid, false);
+
+ xaconnection.close();
+ }
+
+ {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ for (int i = 5; i < 10; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("testXX" + i, txt.getText());
+ }
+ }
+
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
deleted file mode 100644
index da4ecb3..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.integration.openwire.investigations;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class InvestigationOpenwireTest extends BasicOpenWireTest {
-
- @Test
- public void testSimple() throws Exception {
- try {
-
- Connection connection = factory.createConnection();
- // Thread.sleep(5000);
-
- Collection<Session> sessions = new LinkedList<>();
-
- for (int i = 0; i < 10; i++) {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- sessions.add(session);
- }
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testProducerFlowControl() throws Exception {
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
-
- factory.setProducerWindowSize(1024 * 64);
-
- Connection connection = factory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("test"));
-
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testAutoAck() throws Exception {
- try {
-
- Connection connection = factory.createConnection();
- // Thread.sleep(5000);
-
- Collection<Session> sessions = new LinkedList<>();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(queueName);
- System.out.println("Queue:" + queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- producer.send(session.createTextMessage("test"));
-
- Assert.assertNull(consumer.receive(100));
- connection.start();
-
- TextMessage message = (TextMessage)consumer.receive(5000);
-
- Assert.assertNotNull(message);
-
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
-
-
- @Test
- public void testRollback() throws Exception {
- try {
-
- Connection connection = factory.createConnection();
- // Thread.sleep(5000);
-
- Collection<Session> sessions = new LinkedList<>();
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(queueName);
- System.out.println("Queue:" + queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- producer.send(session.createTextMessage("test"));
- producer.send(session.createTextMessage("test2"));
- connection.start();
- Assert.assertNull(consumer.receive(1000));
- session.rollback();
- producer.send(session.createTextMessage("test2"));
- Assert.assertNull(consumer.receive(1000));
- session.commit();
- TextMessage msg = (TextMessage)consumer.receive(1000);
-
-
- Assert.assertNotNull(msg);
- Assert.assertEquals("test2", msg.getText());
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
-
-
- @Test
- public void testClientACK() throws Exception {
- try {
-
- Connection connection = factory.createConnection();
- // Thread.sleep(5000);
-
- Collection<Session> sessions = new LinkedList<>();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = session.createQueue(queueName);
- System.out.println("Queue:" + queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- producer.send(session.createTextMessage("test"));
-
- Assert.assertNull(consumer.receive(100));
- connection.start();
-
- TextMessage message = (TextMessage)consumer.receive(5000);
-
- Assert.assertNotNull(message);
-
- message.acknowledge();
-
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testXASimple() throws Exception {
- try {
-
- XAConnection connection = xaFactory.createXAConnection();
- // Thread.sleep(5000);
-
- Collection<Session> sessions = new LinkedList<>();
-
- for (int i = 0; i < 10; i++) {
- XASession session = connection.createXASession();
- session.getXAResource().start(newXID(), XAResource.TMNOFLAGS);
- sessions.add(session);
- }
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testXAPrepare() throws Exception {
- try {
-
- XAConnection connection = xaFactory.createXAConnection();
- // Thread.sleep(5000);
-
-
- XASession session = connection.createXASession();
-
- Xid xid = newXID();
- session.getXAResource().start(xid, XAResource.TMNOFLAGS);
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("hello"));
- session.getXAResource().end(xid, XAResource.TMSUCCESS);
- session.getXAResource().prepare(xid);
-
- connection.close();
-
- System.err.println("Done!!!");
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index a644718..805a6f5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -105,6 +105,16 @@ public class BindingsImplTest extends ActiveMQTestBase {
private final class FakeTransaction implements Transaction {
@Override
+ public Object getProtocolData() {
+ return null;
+ }
+
+ @Override
+ public void setProtocolData(Object data) {
+
+ }
+
+ @Override
public void addOperation(final TransactionOperation sync) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 99d01e6..78659d2 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -42,6 +42,11 @@ public class FakeQueue implements Queue {
}
@Override
+ public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+
+ }
+
+ @Override
public void deleteQueue(boolean removeConsumers) throws Exception {
}