You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/01/07 19:54:00 UTC
[jira] [Commented] (ARTEMIS-2205) Make AMQP Processing Single
Threaded and other AMQP perf improvements
[ https://issues.apache.org/jira/browse/ARTEMIS-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736265#comment-16736265 ]
ASF GitHub Bot commented on ARTEMIS-2205:
-----------------------------------------
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806
--- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---
@@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener readyListener) {
@Override
public void onRemoteOpen(Connection connection) throws Exception {
- lock();
+ handler.requireHandler();
try {
- try {
- initInternal();
- } catch (Exception e) {
- log.error("Error init connection", e);
- }
- if (!validateConnection(connection)) {
- connection.close();
- } else {
- connection.setContext(AMQPConnectionContext.this);
- connection.setContainer(containerId);
- connection.setProperties(connectionProperties);
- connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
- connection.open();
- }
- } finally {
- unlock();
+ initInternal();
+ } catch (Exception e) {
+ log.error("Error init connection", e);
+ }
+ if (!validateConnection(connection)) {
+ connection.close();
+ } else {
+ connection.setContext(AMQPConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+ connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
}
initialise();
- /*
- * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
- * but its here in case we add support for outbound connections.
- * */
+ /*
+ * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
+ * but its here in case we add support for outbound connections.
+ * */
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true);
if (nextKeepAliveTime != 0 && scheduledPool != null) {
- scheduledPool.schedule(new Runnable() {
- @Override
- public void run() {
- Long rescheduleAt = handler.tick(false);
- if (rescheduleAt == null) {
- // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
- scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
- } else if (rescheduleAt != 0) {
- scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
- }
- }
- }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+ scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
}
}
}
+ class TickerRunnable implements Runnable {
+
+ final ScheduleRunnable scheduleRunnable;
+
+ TickerRunnable(ScheduleRunnable scheduleRunnable) {
+ this.scheduleRunnable = scheduleRunnable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Long rescheduleAt = handler.tick(false);
+ if (rescheduleAt == null) {
+ // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
+ scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
+ } else if (rescheduleAt != 0) {
+ scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
+ }
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ }
--- End diff --
I'm removing the catch here. I don't think we need to use specific loggers on generic handlers like this though. it was a generic handler.. but it's being removed.
> Make AMQP Processing Single Threaded and other AMQP perf improvements
> ---------------------------------------------------------------------
>
> Key: ARTEMIS-2205
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2205
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: AMQP
> Affects Versions: 2.6.3
> Reporter: clebert suconic
> Assignee: clebert suconic
> Priority: Major
> Fix For: 2.7.0
>
>
> This is using Netty Executor (NIOLoop) to process AMQP.
> Instead of using a lock to enter the AMQP processing, what has issues on scaling up multiple connections per queue, it's using a single threaded executor.
>
>
> This represents a major improvement in AMQP. I will take my time to write a blog post about this with multiple scenarios comparing 2.6.3, master (before this change) and after. (I will post a link here to this JIRA).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)