You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/01/07 19:54:00 UTC

[jira] [Commented] (ARTEMIS-2205) Make AMQP Processing Single Threaded and other AMQP perf improvements

    [ https://issues.apache.org/jira/browse/ARTEMIS-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736265#comment-16736265 ] 

ASF GitHub Bot commented on ARTEMIS-2205:
-----------------------------------------

Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---
    @@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener readyListener) {
     
        @Override
        public void onRemoteOpen(Connection connection) throws Exception {
    -      lock();
    +      handler.requireHandler();
           try {
    -         try {
    -            initInternal();
    -         } catch (Exception e) {
    -            log.error("Error init connection", e);
    -         }
    -         if (!validateConnection(connection)) {
    -            connection.close();
    -         } else {
    -            connection.setContext(AMQPConnectionContext.this);
    -            connection.setContainer(containerId);
    -            connection.setProperties(connectionProperties);
    -            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
    -            connection.open();
    -         }
    -      } finally {
    -         unlock();
    +         initInternal();
    +      } catch (Exception e) {
    +         log.error("Error init connection", e);
    +      }
    +      if (!validateConnection(connection)) {
    +         connection.close();
    +      } else {
    +         connection.setContext(AMQPConnectionContext.this);
    +         connection.setContainer(containerId);
    +         connection.setProperties(connectionProperties);
    +         connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
    +         connection.open();
           }
           initialise();
     
    -         /*
    -         * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
    -         * but its here in case we add support for outbound connections.
    -         * */
    +      /*
    +      * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
    +      * but its here in case we add support for outbound connections.
    +      * */
           if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
              long nextKeepAliveTime = handler.tick(true);
              if (nextKeepAliveTime != 0 && scheduledPool != null) {
    -            scheduledPool.schedule(new Runnable() {
    -               @Override
    -               public void run() {
    -                  Long rescheduleAt = handler.tick(false);
    -                  if (rescheduleAt == null) {
    -                     // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
    -                     scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
    -                  } else if (rescheduleAt != 0) {
    -                     scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
    -                  }
    -               }
    -            }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
    +            scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
              }
           }
        }
     
    +   class TickerRunnable implements Runnable {
    +
    +      final ScheduleRunnable scheduleRunnable;
    +
    +      TickerRunnable(ScheduleRunnable scheduleRunnable) {
    +         this.scheduleRunnable = scheduleRunnable;
    +      }
    +
    +      @Override
    +      public void run() {
    +         try {
    +            Long rescheduleAt = handler.tick(false);
    +            if (rescheduleAt == null) {
    +               // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
    +               scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
    +            } else if (rescheduleAt != 0) {
    +               scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
    +            }
    +         } catch (Exception e) {
    +            log.warn(e.getMessage(), e);
    +         }
    --- End diff --
    
    I'm removing the catch here. I don't think we need to use specific loggers on generic handlers like this though. it was a generic handler.. but it's being removed.


> Make AMQP Processing Single Threaded and other AMQP perf improvements
> ---------------------------------------------------------------------
>
>                 Key: ARTEMIS-2205
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2205
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: AMQP
>    Affects Versions: 2.6.3
>            Reporter: clebert suconic
>            Assignee: clebert suconic
>            Priority: Major
>             Fix For: 2.7.0
>
>
> This is using Netty Executor (NIOLoop) to process AMQP.
> Instead of using a lock to enter the AMQP processing, what has issues on scaling up multiple connections per queue, it's using a single threaded executor.
>  
>  
> This represents a major improvement in AMQP. I will take my time to write a blog post about this with multiple scenarios comparing 2.6.3, master (before this change) and after. (I will post a link here to this JIRA).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)