You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/09 19:19:11 UTC
[1/5] activemq-artemis git commit: This closes #1451
Repository: activemq-artemis
Updated Branches:
refs/heads/master 5909a24cd -> 8d6adac7d
This closes #1451
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d6adac7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d6adac7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d6adac7
Branch: refs/heads/master
Commit: 8d6adac7d0cfe78d513267fe1cd1c08a8e5d28a2
Parents: 5909a24 fabc070
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Aug 9 15:18:54 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400
----------------------------------------------------------------------
.../core/server/ActiveMQScheduledComponent.java | 4 +-
.../core/client/impl/ClientSessionImpl.java | 3 -
.../core/ServerSessionPacketHandler.java | 15 +-
.../core/impl/ActiveMQPacketHandler.java | 5 +-
.../protocol/core/impl/CoreSessionCallback.java | 18 +
.../core/server/impl/ActiveMQServerImpl.java | 5 +-
.../core/server/impl/ServerSessionImpl.java | 5 +
.../spi/core/protocol/SessionCallback.java | 4 +
.../amqp/JMSMessageConsumerTest.java | 11 +-
.../integration/client/SendAckFailTest.java | 775 +++++++++++++++++++
.../jms/client/ReceiveNoWaitTest.java | 2 +-
11 files changed, 824 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[2/5] activemq-artemis git commit: ARTEMIS-1333 SendACK listener
message loss (adding test)
Posted by cl...@apache.org.
ARTEMIS-1333 SendACK listener message loss (adding test)
next commit should have the fix.
this is to make it easy to confirm the fix by people looking.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/96c6268f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/96c6268f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/96c6268f
Branch: refs/heads/master
Commit: 96c6268f5a68a293589ac0061d561265d9e79972
Parents: 8bc15b1
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 8 22:39:20 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400
----------------------------------------------------------------------
.../core/server/impl/ActiveMQServerImpl.java | 5 +-
.../amqp/JMSMessageConsumerTest.java | 11 +-
.../integration/client/SendAckFailTest.java | 775 +++++++++++++++++++
3 files changed, 782 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8d9ce99..ce1f354 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import javax.management.MBeanServer;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@@ -47,8 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2098,7 +2097,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/**
* This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
*/
- private StorageManager createStorageManager() {
+ protected StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index c5372ac..68a9801 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -34,6 +28,11 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
new file mode 100644
index 0000000..292b5c4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
+import org.apache.activemq.artemis.core.persistence.GroupingInfo;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
+import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SendAckFailTest extends ActiveMQTestBase {
+
+ @Before
+ @After
+ public void deleteDirectory() throws Exception {
+ deleteDirectory(new File("./target/send-ack"));
+ }
+
+ @Override
+ public String getJournalDir(final int index, final boolean backup) {
+ new Exception("trace").printStackTrace(System.out);
+ return "./target/send-ack/journal";
+ }
+
+ @Override
+ protected String getBindingsDir(final int index, final boolean backup) {
+ return "./target/send-ack/binding";
+ }
+
+ @Override
+ protected String getPageDir(final int index, final boolean backup) {
+ return "./target/send-ack/page";
+ }
+
+ @Override
+ protected String getLargeMessagesDir(final int index, final boolean backup) {
+ return "./target/send-ack/large-message";
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ Process process = SpawnedVMSupport.spawnVM(SendAckFailTest.class.getName());
+ ActiveMQServer server = null;
+
+ try {
+
+ HashSet<Integer> listSent = new HashSet<>();
+
+ Thread t = null;
+ {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ ServerLocator locator = factory.getServerLocator();
+
+ locator.setConfirmationWindowSize(0).setInitialConnectAttempts(100).setRetryInterval(100).setBlockOnDurableSend(false).setReconnectAttempts(0);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession();
+ session.createAddress(SimpleString.toSimpleString("T1"), RoutingType.ANYCAST, true);
+ session.createQueue(SimpleString.toSimpleString("T1"), RoutingType.ANYCAST, SimpleString.toSimpleString("T1"), true);
+
+ ClientProducer producer = session.createProducer("T1");
+
+ session.setSendAcknowledgementHandler(new SendAcknowledgementHandler() {
+ @Override
+ public void sendAcknowledged(Message message) {
+ listSent.add(message.getIntProperty("myid"));
+ }
+ });
+
+ t = new Thread() {
+ @Override
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ try {
+ producer.send(session.createMessage(true).putIntProperty("myid", i));
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ }
+ };
+ t.start();
+ }
+
+ Wait.waitFor(() -> listSent.size() > 100, 5000, 10);
+
+ Assert.assertTrue(process.waitFor(1, TimeUnit.MINUTES));
+
+ server = startServer(false);
+
+ {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ ServerLocator locator = factory.getServerLocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ ClientConsumer consumer = session.createConsumer("T1");
+
+ session.start();
+
+ for (int i = 0; i < listSent.size(); i++) {
+ ClientMessage message = consumer.receive(1000);
+ if (message == null) {
+ for (Integer msgi : listSent) {
+ System.out.println("Message " + msgi + " was lost");
+ }
+ fail("missed messages!");
+ }
+ message.acknowledge();
+
+ if (!listSent.remove(message.getIntProperty("myid"))) {
+ System.out.println("Message " + message + " with id " + message.getIntProperty("myid") + " received in duplicate");
+ fail("Message " + message + " with id " + message.getIntProperty("myid") + " received in duplicate");
+ }
+ }
+ }
+
+ } finally {
+ if (process != null) {
+ process.destroy();
+ }
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+
+ public static void main(String[] arg) {
+ SendAckFailTest test = new SendAckFailTest();
+ test.startServer(true);
+ }
+
+ public ActiveMQServer startServer(boolean fail) {
+ try {
+ //ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true, true);
+
+ AtomicInteger count = new AtomicInteger(0);
+
+ ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+
+ Configuration configuration = createDefaultConfig(true);
+
+ ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) {
+ @Override
+ public StorageManager createStorageManager() {
+ StorageManager original = super.createStorageManager();
+
+ return new StorageManagerDelegate(original) {
+ @Override
+ public void storeMessage(Message message) throws Exception {
+
+ if (fail) {
+ if (count.incrementAndGet() == 110) {
+ System.out.println("Failing " + message);
+ System.out.flush();
+ Thread.sleep(100);
+ Runtime.getRuntime().halt(-1);
+ }
+ }
+ super.storeMessage(message);
+
+ }
+ };
+
+ }
+ };
+
+
+
+ System.out.println("Location::" + server.getConfiguration().getJournalLocation().getAbsolutePath());
+ server.start();
+ return server;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+
+ private class StorageManagerDelegate implements StorageManager {
+
+ @Override
+ public void start() throws Exception {
+ manager.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ manager.stop();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return manager.isStarted();
+ }
+
+ @Override
+ public long generateID() {
+ return manager.generateID();
+ }
+
+ @Override
+ public long getCurrentID() {
+ return manager.getCurrentID();
+ }
+
+ @Override
+ public void criticalError(Throwable error) {
+ manager.criticalError(error);
+ }
+
+ @Override
+ public OperationContext getContext() {
+ return manager.getContext();
+ }
+
+ @Override
+ public void lineUpContext() {
+ manager.lineUpContext();
+ }
+
+ @Override
+ public OperationContext newContext(Executor executor) {
+ return manager.newContext(executor);
+ }
+
+ @Override
+ public OperationContext newSingleThreadContext() {
+ return manager.newSingleThreadContext();
+ }
+
+ @Override
+ public void setContext(OperationContext context) {
+ manager.setContext(context);
+ }
+
+ @Override
+ public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
+ manager.stop(ioCriticalError, sendFailover);
+ }
+
+ @Override
+ public void pageClosed(SimpleString storeName, int pageNumber) {
+ manager.pageClosed(storeName, pageNumber);
+ }
+
+ @Override
+ public void pageDeleted(SimpleString storeName, int pageNumber) {
+ manager.pageDeleted(storeName, pageNumber);
+ }
+
+ @Override
+ public void pageWrite(PagedMessage message, int pageNumber) {
+ manager.pageWrite(message, pageNumber);
+ }
+
+ @Override
+ public void afterCompleteOperations(IOCallback run) {
+ manager.afterCompleteOperations(run);
+ }
+
+ @Override
+ public void afterStoreOperations(IOCallback run) {
+ manager.afterStoreOperations(run);
+ }
+
+ @Override
+ public boolean waitOnOperations(long timeout) throws Exception {
+ return manager.waitOnOperations(timeout);
+ }
+
+ @Override
+ public void waitOnOperations() throws Exception {
+ manager.waitOnOperations();
+ }
+
+ @Override
+ public void beforePageRead() throws Exception {
+ manager.beforePageRead();
+ }
+
+ @Override
+ public void afterPageRead() throws Exception {
+ manager.afterPageRead();
+ }
+
+ @Override
+ public ByteBuffer allocateDirectBuffer(int size) {
+ return manager.allocateDirectBuffer(size);
+ }
+
+ @Override
+ public void freeDirectBuffer(ByteBuffer buffer) {
+ manager.freeDirectBuffer(buffer);
+ }
+
+ @Override
+ public void clearContext() {
+ manager.clearContext();
+ }
+
+ @Override
+ public void confirmPendingLargeMessageTX(Transaction transaction,
+ long messageID,
+ long recordID) throws Exception {
+ manager.confirmPendingLargeMessageTX(transaction, messageID, recordID);
+ }
+
+ @Override
+ public void confirmPendingLargeMessage(long recordID) throws Exception {
+ manager.confirmPendingLargeMessage(recordID);
+ }
+
+ @Override
+ public void storeMessage(Message message) throws Exception {
+ manager.storeMessage(message);
+ }
+
+ @Override
+ public void storeReference(long queueID, long messageID, boolean last) throws Exception {
+ manager.storeReference(queueID, messageID, last);
+ }
+
+ @Override
+ public void deleteMessage(long messageID) throws Exception {
+ manager.deleteMessage(messageID);
+ }
+
+ @Override
+ public void storeAcknowledge(long queueID, long messageID) throws Exception {
+ manager.storeAcknowledge(queueID, messageID);
+ }
+
+ @Override
+ public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception {
+ manager.storeCursorAcknowledge(queueID, position);
+ }
+
+ @Override
+ public void updateDeliveryCount(MessageReference ref) throws Exception {
+ manager.updateDeliveryCount(ref);
+ }
+
+ @Override
+ public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
+ manager.updateScheduledDeliveryTime(ref);
+ }
+
+ @Override
+ public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception {
+ manager.storeDuplicateID(address, duplID, recordID);
+ }
+
+ @Override
+ public void deleteDuplicateID(long recordID) throws Exception {
+ manager.deleteDuplicateID(recordID);
+ }
+
+ @Override
+ public void storeMessageTransactional(long txID, Message message) throws Exception {
+ manager.storeMessageTransactional(txID, message);
+ }
+
+ @Override
+ public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception {
+ manager.storeReferenceTransactional(txID, queueID, messageID);
+ }
+
+ @Override
+ public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception {
+ manager.storeAcknowledgeTransactional(txID, queueID, messageID);
+ }
+
+ @Override
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception {
+ manager.storeCursorAcknowledgeTransactional(txID, queueID, position);
+ }
+
+ @Override
+ public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception {
+ manager.deleteCursorAcknowledgeTransactional(txID, ackID);
+ }
+
+ @Override
+ public void deleteCursorAcknowledge(long ackID) throws Exception {
+ manager.deleteCursorAcknowledge(ackID);
+ }
+
+ @Override
+ public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception {
+ manager.storePageCompleteTransactional(txID, queueID, position);
+ }
+
+ @Override
+ public void deletePageComplete(long ackID) throws Exception {
+ manager.deletePageComplete(ackID);
+ }
+
+ @Override
+ public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception {
+ manager.updateScheduledDeliveryTimeTransactional(txID, ref);
+ }
+
+ @Override
+ public void storeDuplicateIDTransactional(long txID,
+ SimpleString address,
+ byte[] duplID,
+ long recordID) throws Exception {
+ manager.storeDuplicateIDTransactional(txID, address, duplID, recordID);
+ }
+
+ @Override
+ public void updateDuplicateIDTransactional(long txID,
+ SimpleString address,
+ byte[] duplID,
+ long recordID) throws Exception {
+ manager.updateDuplicateIDTransactional(txID, address, duplID, recordID);
+ }
+
+ @Override
+ public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception {
+ manager.deleteDuplicateIDTransactional(txID, recordID);
+ }
+
+ @Override
+ public LargeServerMessage createLargeMessage() {
+ return manager.createLargeMessage();
+ }
+
+ @Override
+ public LargeServerMessage createLargeMessage(long id, Message message) throws Exception {
+ return manager.createLargeMessage(id, message);
+ }
+
+ @Override
+ public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
+ return manager.createFileForLargeMessage(messageID, extension);
+ }
+
+ @Override
+ public void prepare(long txID, Xid xid) throws Exception {
+ manager.prepare(txID, xid);
+ }
+
+ @Override
+ public void commit(long txID) throws Exception {
+ manager.commit(txID);
+ }
+
+ @Override
+ public void commit(long txID, boolean lineUpContext) throws Exception {
+ manager.commit(txID, lineUpContext);
+ }
+
+ @Override
+ public void rollback(long txID) throws Exception {
+ manager.rollback(txID);
+ }
+
+ @Override
+ public void rollbackBindings(long txID) throws Exception {
+ manager.rollbackBindings(txID);
+ }
+
+ @Override
+ public void commitBindings(long txID) throws Exception {
+ manager.commitBindings(txID);
+ }
+
+ @Override
+ public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception {
+ manager.storePageTransaction(txID, pageTransaction);
+ }
+
+ @Override
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception {
+ manager.updatePageTransaction(txID, pageTransaction, depage);
+ }
+
+ @Override
+ public void deletePageTransactional(long recordID) throws Exception {
+ manager.deletePageTransactional(recordID);
+ }
+
+ @Override
+ public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
+ ResourceManager resourceManager,
+ Map<Long, QueueBindingInfo> queueInfos,
+ Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ Set<Pair<Long, Long>> pendingLargeMessages,
+ List<PageCountPending> pendingNonTXPageCounter,
+ JournalLoader journalLoader) throws Exception {
+ return manager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, pendingNonTXPageCounter, journalLoader);
+ }
+
+ @Override
+ public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception {
+ return manager.storeHeuristicCompletion(xid, isCommit);
+ }
+
+ @Override
+ public void deleteHeuristicCompletion(long id) throws Exception {
+ manager.deleteHeuristicCompletion(id);
+ }
+
+ @Override
+ public void addQueueBinding(long tx, Binding binding) throws Exception {
+ manager.addQueueBinding(tx, binding);
+ }
+
+ @Override
+ public void deleteQueueBinding(long tx, long queueBindingID) throws Exception {
+ manager.deleteQueueBinding(tx, queueBindingID);
+ }
+
+ @Override
+ public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+ return manager.storeQueueStatus(queueID, status);
+ }
+
+ @Override
+ public void deleteQueueStatus(long recordID) throws Exception {
+ manager.deleteQueueStatus(recordID);
+ }
+
+ @Override
+ public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
+ manager.addAddressBinding(tx, addressInfo);
+ }
+
+ @Override
+ public void deleteAddressBinding(long tx, long addressBindingID) throws Exception {
+ manager.deleteAddressBinding(tx, addressBindingID);
+ }
+
+ @Override
+ public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
+ List<GroupingInfo> groupingInfos,
+ List<AddressBindingInfo> addressBindingInfos) throws Exception {
+ return manager.loadBindingJournal(queueBindingInfos, groupingInfos, addressBindingInfos);
+ }
+
+ @Override
+ public void addGrouping(GroupBinding groupBinding) throws Exception {
+ manager.addGrouping(groupBinding);
+ }
+
+ @Override
+ public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception {
+ manager.deleteGrouping(tx, groupBinding);
+ }
+
+ @Override
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception {
+ manager.storeAddressSetting(addressSetting);
+ }
+
+ @Override
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception {
+ manager.deleteAddressSetting(addressMatch);
+ }
+
+ @Override
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception {
+ return manager.recoverAddressSettings();
+ }
+
+ @Override
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception {
+ manager.storeSecurityRoles(persistedRoles);
+ }
+
+ @Override
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception {
+ manager.deleteSecurityRoles(addressMatch);
+ }
+
+ @Override
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception {
+ return manager.recoverPersistedRoles();
+ }
+
+ @Override
+ public long storePageCounter(long txID, long queueID, long value) throws Exception {
+ return manager.storePageCounter(txID, queueID, value);
+ }
+
+ @Override
+ public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
+ return manager.storePendingCounter(queueID, pageID, inc);
+ }
+
+ @Override
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception {
+ manager.deleteIncrementRecord(txID, recordID);
+ }
+
+ @Override
+ public void deletePageCounter(long txID, long recordID) throws Exception {
+ manager.deletePageCounter(txID, recordID);
+ }
+
+ @Override
+ public void deletePendingPageCounter(long txID, long recordID) throws Exception {
+ manager.deletePendingPageCounter(txID, recordID);
+ }
+
+ @Override
+ public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
+ return manager.storePageCounterInc(txID, queueID, add);
+ }
+
+ @Override
+ public long storePageCounterInc(long queueID, int add) throws Exception {
+ return manager.storePageCounterInc(queueID, add);
+ }
+
+ @Override
+ public Journal getBindingsJournal() {
+ return manager.getBindingsJournal();
+ }
+
+ @Override
+ public Journal getMessageJournal() {
+ return manager.getMessageJournal();
+ }
+
+ @Override
+ public void startReplication(ReplicationManager replicationManager,
+ PagingManager pagingManager,
+ String nodeID,
+ boolean autoFailBack,
+ long initialReplicationSyncTimeout) throws Exception {
+ manager.startReplication(replicationManager, pagingManager, nodeID, autoFailBack, initialReplicationSyncTimeout);
+ }
+
+ @Override
+ public boolean addToPage(PagingStore store,
+ Message msg,
+ Transaction tx,
+ RouteContextList listCtx) throws Exception {
+ return manager.addToPage(store, msg, tx, listCtx);
+ }
+
+ @Override
+ public void stopReplication() {
+ manager.stopReplication();
+ }
+
+ @Override
+ public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception {
+ manager.addBytesToLargeMessage(appendFile, messageID, bytes);
+ }
+
+ @Override
+ public void storeID(long journalID, long id) throws Exception {
+ manager.storeID(journalID, id);
+ }
+
+ @Override
+ public void deleteID(long journalD) throws Exception {
+ manager.deleteID(journalD);
+ }
+
+ @Override
+ public void readLock() {
+ manager.readLock();
+ }
+
+ @Override
+ public void readUnLock() {
+ manager.readUnLock();
+ }
+
+ @Override
+ public void persistIdGenerator() {
+ manager.persistIdGenerator();
+ }
+
+ @Override
+ public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+ manager.injectMonitor(monitor);
+ }
+
+ private final StorageManager manager;
+
+ StorageManagerDelegate(StorageManager manager) {
+ this.manager = manager;
+ }
+ }
+
+}
\ No newline at end of file
[4/5] activemq-artemis git commit: ARTEMIS-1333 SendACK listener fix
Posted by cl...@apache.org.
ARTEMIS-1333 SendACK listener fix
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fabc0701
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fabc0701
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fabc0701
Branch: refs/heads/master
Commit: fabc0701a38628ffa8f8d9959cc5ec64c6c3cb10
Parents: 96c6268
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 8 22:48:25 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400
----------------------------------------------------------------------
.../protocol/core/ServerSessionPacketHandler.java | 15 ++++++++-------
.../protocol/core/impl/ActiveMQPacketHandler.java | 5 ++++-
.../protocol/core/impl/CoreSessionCallback.java | 18 ++++++++++++++++++
.../core/server/impl/ServerSessionImpl.java | 5 +++++
.../spi/core/protocol/SessionCallback.java | 4 ++++
.../integration/jms/client/ReceiveNoWaitTest.java | 2 +-
6 files changed, 40 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 88a6c2c..87ac615 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
-import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -95,7 +94,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor;
-import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@@ -150,7 +149,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final Actor<Packet> packetActor;
- private final Executor callExecutor;
+ private final ArtemisExecutor callExecutor;
private final CoreProtocolManager manager;
@@ -214,19 +213,20 @@ public class ServerSessionPacketHandler implements ChannelHandler {
public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
+ flushExecutor();
+
try {
session.close(true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
}
- flushExecutor();
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
}
- private void flushExecutor() {
+ public void flushExecutor() {
packetActor.flush();
- OrderedExecutorFactory.flushExecutor(callExecutor);
+ callExecutor.flush();
}
public void close() {
@@ -247,7 +247,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
@Override
public void handlePacket(final Packet packet) {
- channel.confirm(packet);
// This method will call onMessagePacket through an actor
packetActor.act(packet);
@@ -838,6 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final boolean flush,
final boolean closeChannel) {
if (confirmPacket != null) {
+ channel.confirm(confirmPacket);
+
if (flush) {
channel.flushConfirmations();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index c9cc926..cefd10c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -167,10 +167,13 @@ public class ActiveMQPacketHandler implements ChannelHandler {
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
}
- ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
+ CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
+
+ ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler);
+ sessionCallback.setSessionHandler(handler);
// TODO - where is this removed?
protocolManager.addSessionHandler(request.getName(), handler);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 542d726..866130b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
@@ -44,6 +45,8 @@ public final class CoreSessionCallback implements SessionCallback {
private String name;
+ private ServerSessionPacketHandler handler;
+
public CoreSessionCallback(String name,
ProtocolManager protocolManager,
Channel channel,
@@ -54,6 +57,21 @@ public final class CoreSessionCallback implements SessionCallback {
this.connection = connection;
}
+ public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler handler) {
+ this.handler = handler;
+ return this;
+ }
+
+ @Override
+ public void close(boolean failed) {
+ ServerSessionPacketHandler localHandler = handler;
+ if (failed && localHandler != null) {
+ // We wait any pending tasks before we make this as closed
+ localHandler.flushExecutor();
+ }
+ this.handler = null;
+ }
+
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f3617c1..1661ab2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -345,6 +345,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
protected void doClose(final boolean failed) throws Exception {
+ callback.close(failed);
synchronized (this) {
if (!closed) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
@@ -1238,6 +1239,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public void close(final boolean failed) {
if (closed)
return;
+
+ if (failed) {
+
+ }
context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index edfb5dc..ae1612f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -89,4 +89,8 @@ public interface SessionCallback {
* Some protocols (Openwire) needs a special message with the browser is finished.
*/
void browserFinished(ServerConsumer consumer);
+
+ default void close(boolean failed) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
index a426948..6114f49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
@@ -52,7 +52,7 @@ public class ReceiveNoWaitTest extends JMSTestBase {
public void testReceiveNoWait() throws Exception {
assertNotNull(queue);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 1000; i++) {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
[5/5] activemq-artemis git commit: ARTEMIS-1334 Scheduled component
shouldn't be synchronized
Posted by cl...@apache.org.
ARTEMIS-1334 Scheduled component shouldn't be synchronized
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8bc15b11
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8bc15b11
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8bc15b11
Branch: refs/heads/master
Commit: 8bc15b1199e9aa82c1cb92586deaba46174473ad
Parents: 7b5d9f1
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 8 16:00:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/core/server/ActiveMQScheduledComponent.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8bc15b11/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 87e8dc9..9524d89 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -41,7 +41,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
- private ScheduledFuture future;
+ private volatile ScheduledFuture future;
private final boolean onDemand;
long lastTime = 0;
@@ -144,7 +144,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
- public synchronized void stop() {
+ public void stop() {
if (future != null) {
future.cancel(false);
future = null;
[3/5] activemq-artemis git commit: ARTEMIS-1308 Removing
start/endcall around commit
Posted by cl...@apache.org.
ARTEMIS-1308 Removing start/endcall around commit
this issue many warning around the testsuite around MessageListeners
it's valid to call the commit from a different thread on this case.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b5d9f12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b5d9f12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b5d9f12
Branch: refs/heads/master
Commit: 7b5d9f120ca74f3c22ec837dfb17ef662887721c
Parents: 5909a24
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 8 20:30:39 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400
----------------------------------------------------------------------
.../activemq/artemis/core/client/impl/ClientSessionImpl.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b5d9f12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index ef4e87c..41330a6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -787,7 +787,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
if (rollbackOnly) {
rollbackOnFailover(true);
}
- startCall();
try {
sessionContext.simpleCommit(block);
} catch (ActiveMQException e) {
@@ -800,8 +799,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} else {
throw e;
}
- } finally {
- endCall();
}
//oops, we have failed over during the commit and don't know what happened