You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by clebertsuconic <gi...@git.apache.org> on 2017/03/17 20:05:24 UTC

[GitHub] activemq-artemis pull request #1102: Two Performance and TX changes

GitHub user clebertsuconic opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1102

    Two Performance and TX changes

    These two commits are part of some work I have been doing
    with performance tests
    and some work I was doing with @tabish121 to ensure TX are not stalling.
    
    These both commits are related and I don't know if 1046 would work without 1045, 
    
    so, I'm sending both commits on a single PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/clebertsuconic/activemq-artemis perf-improv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1102
    
----
commit 1977e909bcaad24648ea36fa2a27a9e61847cf7d
Author: Clebert Suconic <cl...@apache.org>
Date:   2017-03-17T15:14:18Z

    ARTEMIS-1045 Performance improvements on AMQP

commit 31b6f5fe827d2af8d653444179aa61fc2d0306b7
Author: Clebert Suconic <cl...@apache.org>
Date:   2017-03-17T19:59:34Z

    ARTEMIS-1046 Fixing TX eventually stalling with AMQP
    
    I have also reviewed the model in which we used transactions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1102: Two Performance and TX changes

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1102#discussion_r106741245
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java ---
    @@ -78,44 +82,68 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
                 Binary txID = sessionSPI.newTransaction();
                 Declared declared = new Declared();
                 declared.setTxnId(txID);
    -            delivery.disposition(declared);
    +            synchronized (connection.getLock()) {
    +               delivery.disposition(declared);
    +            }
              } else if (action instanceof Discharge) {
                 Discharge discharge = (Discharge) action;
     
                 Binary txID = discharge.getTxnId();
    -            sessionSPI.dischargeTx(txID);
    +            ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
    +            tx.discharge();
    +
                 if (discharge.getFail()) {
                    try {
    -                  sessionSPI.rollbackTX(txID, true);
    -                  delivery.disposition(new Accepted());
    +                  tx.rollback();
    +                  synchronized (connection.getLock()) {
    +                     delivery.disposition(new Accepted());
    +                  }
    +                  connection.flush();
                    } catch (Exception e) {
    +                  log.warn(e.getMessage(), e);
                       throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                    }
                 } else {
                    try {
    -                  sessionSPI.commitTX(txID);
    -                  delivery.disposition(new Accepted());
    +                  tx.commit();
    +                  synchronized (connection.getLock()) {
    +                     delivery.disposition(new Accepted());
    +                  }
    +                  connection.flush();
                    } catch (ActiveMQAMQPException amqpE) {
    +                  log.warn(amqpE.getMessage(), amqpE);
                       throw amqpE;
                    } catch (Exception e) {
    +                  log.warn(e.getMessage(), e);
                       throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                    }
                 }
     
                 // Replenish coordinator receiver credit on exhaustion so sender can continue
                 // transaction declare and discahrge operations.
    -            if (receiver.getCredit() == 0) {
    -               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
    +            synchronized (connection.getLock()) {
    --- End diff --
    
    I have checked and there are no locks being held...
    
    This kind of pattern is being used everywhere else we use proton. We need to make sure nothing is changed from an incoming packet. so I need the lock here.
    
    
    I'm not sure if that's what you meant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1102: Two Performance and TX changes

Posted by tabish121 <gi...@git.apache.org>.
Github user tabish121 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1102#discussion_r106739208
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java ---
    @@ -78,44 +82,68 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
                 Binary txID = sessionSPI.newTransaction();
                 Declared declared = new Declared();
                 declared.setTxnId(txID);
    -            delivery.disposition(declared);
    +            synchronized (connection.getLock()) {
    +               delivery.disposition(declared);
    +            }
              } else if (action instanceof Discharge) {
                 Discharge discharge = (Discharge) action;
     
                 Binary txID = discharge.getTxnId();
    -            sessionSPI.dischargeTx(txID);
    +            ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
    +            tx.discharge();
    +
                 if (discharge.getFail()) {
                    try {
    -                  sessionSPI.rollbackTX(txID, true);
    -                  delivery.disposition(new Accepted());
    +                  tx.rollback();
    +                  synchronized (connection.getLock()) {
    +                     delivery.disposition(new Accepted());
    +                  }
    +                  connection.flush();
                    } catch (Exception e) {
    +                  log.warn(e.getMessage(), e);
                       throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                    }
                 } else {
                    try {
    -                  sessionSPI.commitTX(txID);
    -                  delivery.disposition(new Accepted());
    +                  tx.commit();
    +                  synchronized (connection.getLock()) {
    +                     delivery.disposition(new Accepted());
    +                  }
    +                  connection.flush();
                    } catch (ActiveMQAMQPException amqpE) {
    +                  log.warn(amqpE.getMessage(), amqpE);
                       throw amqpE;
                    } catch (Exception e) {
    +                  log.warn(e.getMessage(), e);
                       throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                    }
                 }
     
                 // Replenish coordinator receiver credit on exhaustion so sender can continue
                 // transaction declare and discahrge operations.
    -            if (receiver.getCredit() == 0) {
    -               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
    +            synchronized (connection.getLock()) {
    --- End diff --
    
    I was going to submit a change here but since you've modified things I'll just see if we can get this into this commit.  I inadvertently put this into the handler block for discharge but really it should be checked in either case.  Also it makes sense to move this up to top after the receiver reference is obtained so that credit is granted before any work is done that might throw an exception and cause the check to be missed.  Can possibly do it in the fix block where the connection lock is taken to avoid undue lock contention.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1102: Two Performance and TX changes

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1102


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---