You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/02/14 15:24:00 UTC

[jira] [Work logged] (ARTEMIS-2205) Make AMQP Processing Single Threaded and other AMQP perf improvements

     [ https://issues.apache.org/jira/browse/ARTEMIS-2205?focusedWorklogId=198734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198734 ]

ASF GitHub Bot logged work on ARTEMIS-2205:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Feb/19 15:23
            Start Date: 14/Feb/19 15:23
    Worklog Time Spent: 10m 
      Work Description: jdanekrh commented on pull request #2467: ARTEMIS-2205 Performance improvements on AMQP and other parts
URL: https://github.com/apache/activemq-artemis/pull/2467#discussion_r256879260
 
 

 ##########
 File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
 ##########
 @@ -106,75 +106,51 @@ public ProtonHandler(Executor flushExecutor, boolean isServer) {
    }
 
    public Long tick(boolean firstTick) {
-      if (firstTick) {
-         // the first tick needs to guarantee a lock here
-         lock.lock();
-      } else {
-         if (!lock.tryLock()) {
-            log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
-            // if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
-            return null;
-         }
-      }
-      try {
-         if (!firstTick) {
-            try {
-               if (connection.getLocalState() != EndpointState.CLOSED) {
-                  long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                  if (transport.isClosed()) {
-                     throw new IllegalStateException("Channel was inactive for to long");
-                  }
-                  return rescheduleAt;
+      requireHandler();
+      if (!firstTick) {
+         try {
+            if (connection.getLocalState() != EndpointState.CLOSED) {
+               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+               if (transport.isClosed()) {
+                  throw new IllegalStateException("Channel was inactive for to long");
                }
-            } catch (Exception e) {
-               log.warn(e.getMessage(), e);
-               transport.close();
-               connection.setCondition(new ErrorCondition());
+               return rescheduleAt;
             }
-            return 0L;
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+            transport.close();
+            connection.setCondition(new ErrorCondition());
+         } finally {
+            flush();
          }
-         return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-      } finally {
-         lock.unlock();
-         flushBytes();
+         return 0L;
       }
+      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
    }
 
    /**
     * We cannot flush until the initial handshake was finished.
     * If this happens before the handshake, the connection response will happen without SASL
     * and the client will respond and fail with an invalid code.
-    * */
+    */
    public void scheduledFlush() {
       if (receivedFirstPacket) {
          flush();
       }
    }
 
    public int capacity() {
-      lock.lock();
-      try {
-         return transport.capacity();
-      } finally {
-         lock.unlock();
-      }
-   }
-
-   public void lock() {
-      lock.lock();
+      requireHandler();
+      return transport.capacity();
    }
 
-   public void unlock() {
-      lock.unlock();
-   }
-
-   public boolean tryLock(long time, TimeUnit timeUnit) {
-      try {
-         return lock.tryLock(time, timeUnit);
-      } catch (InterruptedException e) {
-
-         Thread.currentThread().interrupt();
-         return false;
+   public void requireHandler() {
+      if (!workerExecutor.inEventLoop()) {
+         new Exception("saco!!!").printStackTrace();
 
 Review comment:
   Leftover debug code here? (currently still on master)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 198734)
    Time Spent: 40m  (was: 0.5h)

> Make AMQP Processing Single Threaded and other AMQP perf improvements
> ---------------------------------------------------------------------
>
>                 Key: ARTEMIS-2205
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2205
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: AMQP
>    Affects Versions: 2.6.3
>            Reporter: clebert suconic
>            Assignee: clebert suconic
>            Priority: Major
>             Fix For: 2.7.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> This is using Netty Executor (NIOLoop) to process AMQP.
> Instead of using a lock to enter the AMQP processing, what has issues on scaling up multiple connections per queue, it's using a single threaded executor.
>  
>  
> This represents a major improvement in AMQP. I will take my time to write a blog post about this with multiple scenarios comparing 2.6.3, master (before this change) and after. (I will post a link here to this JIRA).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)