You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ga...@apache.org on 2020/03/03 11:42:02 UTC
[activemq-artemis] 02/03: ARTEMIS-2716 Refactoring
This is an automated email from the ASF dual-hosted git repository.
gaohoward pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit cb8da541107355f16eb26b21b6563a06876b741f
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Mar 3 18:48:18 2020 +0800
ARTEMIS-2716 Refactoring
---
.../artemis/api/config/ServerLocatorConfig.java | 86 ++++++
.../artemis/api/core/client/ServerLocator.java | 5 +
.../core/client/impl/ClientSessionFactoryImpl.java | 27 +-
.../core/client/impl/ServerLocatorImpl.java | 315 ++++++---------------
.../extensions/xa/recovery/XARecoveryConfig.java | 112 +-------
5 files changed, 202 insertions(+), 343 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
new file mode 100644
index 0000000..395277a
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.config;
+
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+
+public class ServerLocatorConfig {
+ public long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+ public long connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL;
+ public long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
+ public long callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT;
+ public int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+ public int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+ public int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
+ public int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+ public int producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+ public int producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE;
+ public boolean blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+ public boolean blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+ public boolean blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+ public boolean autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP;
+ public boolean preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE;
+ public int ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
+ public String connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+ public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
+ public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+ public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+ public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
+ public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+ public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
+ public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
+ public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
+ public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+ public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+ public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+ public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
+
+ public ServerLocatorConfig() {
+ }
+
+ public ServerLocatorConfig(final ServerLocatorConfig locator) {
+ compressLargeMessage = locator.compressLargeMessage;
+ cacheLargeMessagesClient = locator.cacheLargeMessagesClient;
+ clientFailureCheckPeriod = locator.clientFailureCheckPeriod;
+ connectionTTL = locator.connectionTTL;
+ callTimeout = locator.callTimeout;
+ callFailoverTimeout = locator.callFailoverTimeout;
+ minLargeMessageSize = locator.minLargeMessageSize;
+ consumerWindowSize = locator.consumerWindowSize;
+ consumerMaxRate = locator.consumerMaxRate;
+ confirmationWindowSize = locator.confirmationWindowSize;
+ producerWindowSize = locator.producerWindowSize;
+ producerMaxRate = locator.producerMaxRate;
+ blockOnAcknowledge = locator.blockOnAcknowledge;
+ blockOnDurableSend = locator.blockOnDurableSend;
+ blockOnNonDurableSend = locator.blockOnNonDurableSend;
+ autoGroup = locator.autoGroup;
+ preAcknowledge = locator.preAcknowledge;
+ connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName;
+ ackBatchSize = locator.ackBatchSize;
+ useGlobalPools = locator.useGlobalPools;
+ scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
+ threadPoolMaxSize = locator.threadPoolMaxSize;
+ retryInterval = locator.retryInterval;
+ retryIntervalMultiplier = locator.retryIntervalMultiplier;
+ maxRetryInterval = locator.maxRetryInterval;
+ reconnectAttempts = locator.reconnectAttempts;
+ initialConnectAttempts = locator.initialConnectAttempts;
+ initialMessagePacketSize = locator.initialMessagePacketSize;
+ useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index 2dbe0ba..937cadf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.core.client;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor;
@@ -822,4 +823,8 @@ public interface ServerLocator extends AutoCloseable {
/** This will only instantiate internal objects such as the topology */
void initialize() throws ActiveMQException;
+
+ ServerLocatorConfig getLocatorConfig();
+
+ void setLocatorConfig(ServerLocatorConfig serverLocatorConfig);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 4b0394f..583d8fb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -156,13 +157,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connectorConfig,
- final long callTimeout,
- final long callFailoverTimeout,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
+ final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
@@ -182,27 +177,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
checkTransportKeys(connectorFactory, connectorConfig);
- this.callTimeout = callTimeout;
+ this.callTimeout = locatorConfig.callTimeout;
- this.callFailoverTimeout = callFailoverTimeout;
+ this.callFailoverTimeout = locatorConfig.callFailoverTimeout;
// HORNETQ-1314 - if this in an in-vm connection then disable connection monitoring
if (connectorFactory.isReliable() &&
- clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD &&
- connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) {
+ locatorConfig.clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD &&
+ locatorConfig.connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) {
this.clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM;
this.connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL_INVM;
} else {
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.clientFailureCheckPeriod = locatorConfig.clientFailureCheckPeriod;
- this.connectionTTL = connectionTTL;
+ this.connectionTTL = locatorConfig.connectionTTL;
}
- this.retryInterval = retryInterval;
+ this.retryInterval = locatorConfig.retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.retryIntervalMultiplier = locatorConfig.retryIntervalMultiplier;
- this.maxRetryInterval = maxRetryInterval;
+ this.maxRetryInterval = locatorConfig.maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 7043149..3ac249f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -122,7 +123,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private volatile boolean receivedTopology;
- private boolean compressLargeMessage;
/** This specifies serverLocator.connect was used,
* which means it's a cluster connection.
@@ -144,59 +144,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// Settable attributes:
- private boolean cacheLargeMessagesClient;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private long callFailoverTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialConnectAttempts;
-
- private int initialMessagePacketSize;
private final Object stateGuard = new Object();
private transient STATE state;
@@ -227,6 +174,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final Exception traceException = new Exception();
+ private ServerLocatorConfig config = new ServerLocatorConfig();
+
public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
}
@@ -234,7 +183,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private synchronized void setThreadPools() {
if (threadPool != null) {
return;
- } else if (useGlobalPools) {
+ } else if (config.useGlobalPools) {
threadPool = ActiveMQClient.getGlobalThreadPool();
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
@@ -248,10 +197,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
});
- if (threadPoolMaxSize == -1) {
+ if (config.threadPoolMaxSize == -1) {
threadPool = Executors.newCachedThreadPool(factory);
} else {
- threadPool = new ActiveMQThreadPoolExecutor(0, threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
+ threadPool = new ActiveMQThreadPoolExecutor(0, config.threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
}
factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@@ -261,7 +210,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
});
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
}
@@ -273,7 +222,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return false;
if (this.threadPool == null && this.scheduledThreadPool == null) {
- useGlobalPools = false;
+ config.useGlobalPools = false;
shutdownPool = false;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
@@ -284,13 +233,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
private void instantiateLoadBalancingPolicy() {
- if (connectionLoadBalancingPolicyClassName == null) {
+ if (config.connectionLoadBalancingPolicyClassName == null) {
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
}
AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, config.connectionLoadBalancingPolicyClassName);
return null;
}
});
@@ -336,6 +285,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
+ @Override
+ public ServerLocatorConfig getLocatorConfig() {
+ return config;
+ }
+
+ @Override
+ public void setLocatorConfig(ServerLocatorConfig config) {
+ this.config = config;
+ }
+
private static DiscoveryGroup createDiscoveryGroup(String nodeID,
DiscoveryGroupConfiguration config) throws Exception {
return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
@@ -357,67 +316,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
- clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
-
- callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT;
-
- minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
-
- cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
- cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
clusterConnection = false;
-
- useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
}
public static ServerLocator newLocator(String uri) {
@@ -515,40 +414,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
topology = locator.topology;
topologyArray = locator.topologyArray;
receivedTopology = locator.receivedTopology;
- compressLargeMessage = locator.compressLargeMessage;
- cacheLargeMessagesClient = locator.cacheLargeMessagesClient;
- clientFailureCheckPeriod = locator.clientFailureCheckPeriod;
- connectionTTL = locator.connectionTTL;
- callTimeout = locator.callTimeout;
- callFailoverTimeout = locator.callFailoverTimeout;
- minLargeMessageSize = locator.minLargeMessageSize;
- consumerWindowSize = locator.consumerWindowSize;
- consumerMaxRate = locator.consumerMaxRate;
- confirmationWindowSize = locator.confirmationWindowSize;
- producerWindowSize = locator.producerWindowSize;
- producerMaxRate = locator.producerMaxRate;
- blockOnAcknowledge = locator.blockOnAcknowledge;
- blockOnDurableSend = locator.blockOnDurableSend;
- blockOnNonDurableSend = locator.blockOnNonDurableSend;
- autoGroup = locator.autoGroup;
- preAcknowledge = locator.preAcknowledge;
- connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName;
- ackBatchSize = locator.ackBatchSize;
- useGlobalPools = locator.useGlobalPools;
- scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
- threadPoolMaxSize = locator.threadPoolMaxSize;
- retryInterval = locator.retryInterval;
- retryIntervalMultiplier = locator.retryIntervalMultiplier;
- maxRetryInterval = locator.maxRetryInterval;
- reconnectAttempts = locator.reconnectAttempts;
- initialConnectAttempts = locator.initialConnectAttempts;
- initialMessagePacketSize = locator.initialMessagePacketSize;
+ config = new ServerLocatorConfig(locator.config);
startExecutor = locator.startExecutor;
afterConnectListener = locator.afterConnectListener;
groupID = locator.groupID;
nodeID = locator.nodeID;
clusterTransportConfiguration = locator.clusterTransportConfiguration;
- useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
}
private TransportConfiguration selectConnector() {
@@ -561,7 +432,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
- if (usedTopology != null && useTopologyForLoadBalancing) {
+ if (usedTopology != null && config.useTopologyForLoadBalancing) {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology.");
}
@@ -591,7 +462,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
- if (usedTopology != null && useTopologyForLoadBalancing) {
+ if (usedTopology != null && config.useTopologyForLoadBalancing) {
return usedTopology.length;
} else {
return initialConnectors.length;
@@ -721,7 +592,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception {
- return createSessionFactory(transportConfiguration, reconnectAttempts);
+ return createSessionFactory(transportConfiguration, config.reconnectAttempts);
}
@Override
@@ -731,7 +602,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
initialize();
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
addToConnecting(factory);
try {
@@ -798,7 +669,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// try each factory in the list until we find one which works
try {
- factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
+ factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
try {
addToConnecting(factory);
// We always try to connect here with only one attempt,
@@ -813,12 +684,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
attempts++;
int connectorsSize = getConnectorsSize();
- int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
+ int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;
- if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
+ if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
- if (factory.waitForRetry(retryInterval)) {
+ if (factory.waitForRetry(config.retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
retry = true;
@@ -836,7 +707,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// ATM topology is never != null. Checking here just to be consistent with
// how the sendSubscription happens.
// in case this ever changes.
- if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) {
+ if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) {
factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
@@ -855,12 +726,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
discoveryOK = checkOnDiscovery();
- retryDiscovery = (initialConnectAttempts > 0 && tryNumber++ < initialConnectAttempts) && !disableDiscoveryRetries;
+ retryDiscovery = (config.initialConnectAttempts > 0 && tryNumber++ < config.initialConnectAttempts) && !disableDiscoveryRetries;
if (!discoveryOK) {
if (retryDiscovery) {
- ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, initialConnectAttempts);
+ ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, config.initialConnectAttempts);
} else {
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
}
@@ -960,301 +831,301 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public boolean isCacheLargeMessagesClient() {
- return cacheLargeMessagesClient;
+ return config.cacheLargeMessagesClient;
}
@Override
public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) {
- cacheLargeMessagesClient = cached;
+ config.cacheLargeMessagesClient = cached;
return this;
}
@Override
public long getClientFailureCheckPeriod() {
- return clientFailureCheckPeriod;
+ return config.clientFailureCheckPeriod;
}
@Override
public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) {
checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.config.clientFailureCheckPeriod = clientFailureCheckPeriod;
return this;
}
@Override
public long getConnectionTTL() {
- return connectionTTL;
+ return config.connectionTTL;
}
@Override
public ServerLocatorImpl setConnectionTTL(final long connectionTTL) {
checkWrite();
- this.connectionTTL = connectionTTL;
+ this.config.connectionTTL = connectionTTL;
return this;
}
@Override
public long getCallTimeout() {
- return callTimeout;
+ return config.callTimeout;
}
@Override
public ServerLocatorImpl setCallTimeout(final long callTimeout) {
checkWrite();
- this.callTimeout = callTimeout;
+ this.config.callTimeout = callTimeout;
return this;
}
@Override
public long getCallFailoverTimeout() {
- return callFailoverTimeout;
+ return config.callFailoverTimeout;
}
@Override
public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) {
checkWrite();
- this.callFailoverTimeout = callFailoverTimeout;
+ this.config.callFailoverTimeout = callFailoverTimeout;
return this;
}
@Override
public int getMinLargeMessageSize() {
- return minLargeMessageSize;
+ return config.minLargeMessageSize;
}
@Override
public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) {
checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
+ this.config.minLargeMessageSize = minLargeMessageSize;
return this;
}
@Override
public int getConsumerWindowSize() {
- return consumerWindowSize;
+ return config.consumerWindowSize;
}
@Override
public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) {
checkWrite();
- this.consumerWindowSize = consumerWindowSize;
+ this.config.consumerWindowSize = consumerWindowSize;
return this;
}
@Override
public int getConsumerMaxRate() {
- return consumerMaxRate;
+ return config.consumerMaxRate;
}
@Override
public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) {
checkWrite();
- this.consumerMaxRate = consumerMaxRate;
+ this.config.consumerMaxRate = consumerMaxRate;
return this;
}
@Override
public int getConfirmationWindowSize() {
- return confirmationWindowSize;
+ return config.confirmationWindowSize;
}
@Override
public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) {
checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
+ this.config.confirmationWindowSize = confirmationWindowSize;
return this;
}
@Override
public int getProducerWindowSize() {
- return producerWindowSize;
+ return config.producerWindowSize;
}
@Override
public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) {
checkWrite();
- this.producerWindowSize = producerWindowSize;
+ this.config.producerWindowSize = producerWindowSize;
return this;
}
@Override
public int getProducerMaxRate() {
- return producerMaxRate;
+ return config.producerMaxRate;
}
@Override
public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) {
checkWrite();
- this.producerMaxRate = producerMaxRate;
+ this.config.producerMaxRate = producerMaxRate;
return this;
}
@Override
public boolean isBlockOnAcknowledge() {
- return blockOnAcknowledge;
+ return config.blockOnAcknowledge;
}
@Override
public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) {
checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
+ this.config.blockOnAcknowledge = blockOnAcknowledge;
return this;
}
@Override
public boolean isBlockOnDurableSend() {
- return blockOnDurableSend;
+ return config.blockOnDurableSend;
}
@Override
public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) {
checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
+ this.config.blockOnDurableSend = blockOnDurableSend;
return this;
}
@Override
public boolean isBlockOnNonDurableSend() {
- return blockOnNonDurableSend;
+ return config.blockOnNonDurableSend;
}
@Override
public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) {
checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
+ this.config.blockOnNonDurableSend = blockOnNonDurableSend;
return this;
}
@Override
public boolean isAutoGroup() {
- return autoGroup;
+ return config.autoGroup;
}
@Override
public ServerLocatorImpl setAutoGroup(final boolean autoGroup) {
checkWrite();
- this.autoGroup = autoGroup;
+ this.config.autoGroup = autoGroup;
return this;
}
@Override
public boolean isPreAcknowledge() {
- return preAcknowledge;
+ return config.preAcknowledge;
}
@Override
public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) {
checkWrite();
- this.preAcknowledge = preAcknowledge;
+ this.config.preAcknowledge = preAcknowledge;
return this;
}
@Override
public int getAckBatchSize() {
- return ackBatchSize;
+ return config.ackBatchSize;
}
@Override
public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) {
checkWrite();
- this.ackBatchSize = ackBatchSize;
+ this.config.ackBatchSize = ackBatchSize;
return this;
}
@Override
public boolean isUseGlobalPools() {
- return useGlobalPools;
+ return config.useGlobalPools;
}
@Override
public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) {
checkWrite();
- this.useGlobalPools = useGlobalPools;
+ this.config.useGlobalPools = useGlobalPools;
return this;
}
@Override
public int getScheduledThreadPoolMaxSize() {
- return scheduledThreadPoolMaxSize;
+ return config.scheduledThreadPoolMaxSize;
}
@Override
public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) {
checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ this.config.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
return this;
}
@Override
public int getThreadPoolMaxSize() {
- return threadPoolMaxSize;
+ return config.threadPoolMaxSize;
}
@Override
public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) {
checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
+ this.config.threadPoolMaxSize = threadPoolMaxSize;
return this;
}
@Override
public long getRetryInterval() {
- return retryInterval;
+ return config.retryInterval;
}
@Override
public ServerLocatorImpl setRetryInterval(final long retryInterval) {
checkWrite();
- this.retryInterval = retryInterval;
+ this.config.retryInterval = retryInterval;
return this;
}
@Override
public long getMaxRetryInterval() {
- return maxRetryInterval;
+ return config.maxRetryInterval;
}
@Override
public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) {
checkWrite();
- maxRetryInterval = retryInterval;
+ this.config.maxRetryInterval = retryInterval;
return this;
}
@Override
public double getRetryIntervalMultiplier() {
- return retryIntervalMultiplier;
+ return config.retryIntervalMultiplier;
}
@Override
public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) {
checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.config.retryIntervalMultiplier = retryIntervalMultiplier;
return this;
}
@Override
public int getReconnectAttempts() {
- return reconnectAttempts;
+ return config.reconnectAttempts;
}
@Override
public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) {
checkWrite();
- this.reconnectAttempts = reconnectAttempts;
+ this.config.reconnectAttempts = reconnectAttempts;
return this;
}
@Override
public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) {
checkWrite();
- this.initialConnectAttempts = initialConnectAttempts;
+ this.config.initialConnectAttempts = initialConnectAttempts;
return this;
}
@Override
public int getInitialConnectAttempts() {
- return initialConnectAttempts;
+ return config.initialConnectAttempts;
}
@Deprecated
@@ -1271,13 +1142,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public String getConnectionLoadBalancingPolicyClassName() {
- return connectionLoadBalancingPolicyClassName;
+ return config.connectionLoadBalancingPolicyClassName;
}
@Override
public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) {
checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ config.connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
return this;
}
@@ -1317,13 +1188,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public int getInitialMessagePacketSize() {
- return initialMessagePacketSize;
+ return config.initialMessagePacketSize;
}
@Override
public ServerLocatorImpl setInitialMessagePacketSize(final int size) {
checkWrite();
- initialMessagePacketSize = size;
+ config.initialMessagePacketSize = size;
return this;
}
@@ -1341,12 +1212,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public boolean isCompressLargeMessage() {
- return compressLargeMessage;
+ return config.compressLargeMessage;
}
@Override
public ServerLocatorImpl setCompressLargeMessage(boolean avoid) {
- this.compressLargeMessage = avoid;
+ this.config.compressLargeMessage = avoid;
return this;
}
@@ -1692,13 +1563,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) {
- this.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
+ this.config.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
return this;
}
@Override
public boolean getUseTopologyForLoadBalancing() {
- return useTopologyForLoadBalancing;
+ return config.useTopologyForLoadBalancing;
}
@Override
@@ -1820,11 +1691,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
- if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts) {
+ if (config.initialConnectAttempts >= 0 && retryNumber > config.initialConnectAttempts) {
break;
}
- if (latch.await(retryInterval, TimeUnit.MILLISECONDS))
+ if (latch.await(config.retryInterval, TimeUnit.MILLISECONDS))
return null;
}
@@ -1859,7 +1730,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
connectors = new ArrayList<>();
if (initialConnectors != null) {
for (TransportConfiguration initialConnector : initialConnectors) {
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
factory.disableFinalizeCheck();
diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
index f9826e0..3da77de 100644
--- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
+++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -44,36 +45,7 @@ public class XARecoveryConfig {
private final String password;
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;
-
- // ServerLocator properties
- private Long callFailoverTimeout;
- private Long callTimeout;
- private Long clientFailureCheckPeriod;
- private Integer confirmationWindowSize;
- private String connectionLoadBalancingPolicyClassName;
- private Long connectionTTL;
- private Integer consumerMaxRate;
- private Integer consumerWindowSize;
- private Integer initialConnectAttempts;
- private Integer producerMaxRate;
- private Integer producerWindowSize;
- private Integer minLargeMessageSize;
- private Long retryInterval;
- private Double retryIntervalMultiplier;
- private Long maxRetryInterval;
- private Integer reconnectAttempts;
- private Integer initialMessagePacketSize;
- private Integer scheduledThreadPoolMaxSize;
- private Integer threadPoolMaxSize;
- private boolean autoGroup;
- private boolean blockOnAcknowledge;
- private boolean blockOnNonDurableSend;
- private boolean blockOnDurableSend;
- private boolean preAcknowledge;
- private boolean useGlobalPools;
- private boolean cacheLargeMessagesClient;
- private boolean compressLargeMessage;
- private boolean failoverOnInitialConnection;
+ private ServerLocatorConfig locatorConfig;
public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
String userName,
@@ -102,7 +74,7 @@ public class XARecoveryConfig {
this.username = username;
this.password = password;
this.ha = ha;
- this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
+ this.properties = properties == null ? new HashMap<>() : properties;
this.clientProtocolManager = clientProtocolManager;
}
@@ -164,8 +136,7 @@ public class XARecoveryConfig {
this.ha = serverLocator.isHA();
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;
-
- readLocatorProperties(serverLocator);
+ this.locatorConfig = serverLocator.getLocatorConfig();
}
public boolean isHA() {
@@ -209,82 +180,13 @@ public class XARecoveryConfig {
serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}
- writeLocatorProperties(serverLocator);
+ if (this.locatorConfig != null) {
+ serverLocator.setLocatorConfig(new ServerLocatorConfig(this.locatorConfig));
+ }
return serverLocator;
}
- private void writeLocatorProperties(ServerLocator serverLocator) {
- serverLocator.setAutoGroup(this.autoGroup);
- serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge);
- serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend);
- serverLocator.setBlockOnDurableSend(this.blockOnDurableSend);
- serverLocator.setPreAcknowledge(this.preAcknowledge);
- serverLocator.setUseGlobalPools(this.useGlobalPools);
- serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient);
- serverLocator.setCompressLargeMessage(this.compressLargeMessage);
- serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection);
-
- serverLocator.setConsumerMaxRate(this.consumerMaxRate);
- serverLocator.setConsumerWindowSize(this.consumerWindowSize);
- serverLocator.setMinLargeMessageSize(this.minLargeMessageSize);
- serverLocator.setProducerMaxRate(this.producerMaxRate);
- serverLocator.setProducerWindowSize(this.producerWindowSize);
- serverLocator.setConfirmationWindowSize(this.confirmationWindowSize);
- serverLocator.setReconnectAttempts(this.reconnectAttempts);
- serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize);
- serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize);
- serverLocator.setInitialConnectAttempts(this.initialConnectAttempts);
- serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize);
-
- serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
- serverLocator.setCallTimeout(this.callTimeout);
- serverLocator.setCallFailoverTimeout(this.callFailoverTimeout);
- serverLocator.setConnectionTTL(this.connectionTTL);
- serverLocator.setRetryInterval(this.retryInterval);
- serverLocator.setMaxRetryInterval(this.maxRetryInterval);
-
- serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier);
-
- serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName);
-}
-
- private void readLocatorProperties(ServerLocator locator) {
-
- this.autoGroup = locator.isAutoGroup();
- this.blockOnAcknowledge = locator.isBlockOnAcknowledge();
- this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend();
- this.blockOnDurableSend = locator.isBlockOnDurableSend();
- this.preAcknowledge = locator.isPreAcknowledge();
- this.useGlobalPools = locator.isUseGlobalPools();
- this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient();
- this.compressLargeMessage = locator.isCompressLargeMessage();
- this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection();
-
- this.consumerMaxRate = locator.getConsumerMaxRate();
- this.consumerWindowSize = locator.getConsumerWindowSize();
- this.minLargeMessageSize = locator.getMinLargeMessageSize();
- this.producerMaxRate = locator.getProducerMaxRate();
- this.producerWindowSize = locator.getProducerWindowSize();
- this.confirmationWindowSize = locator.getConfirmationWindowSize();
- this.reconnectAttempts = locator.getReconnectAttempts();
- this.threadPoolMaxSize = locator.getThreadPoolMaxSize();
- this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize();
- this.initialConnectAttempts = locator.getInitialConnectAttempts();
- this.initialMessagePacketSize = locator.getInitialMessagePacketSize();
-
- this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod();
- this.callTimeout = locator.getCallTimeout();
- this.callFailoverTimeout = locator.getCallFailoverTimeout();
- this.connectionTTL = locator.getConnectionTTL();
- this.retryInterval = locator.getRetryInterval();
- this.maxRetryInterval = locator.getMaxRetryInterval();
-
- this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier();
-
- this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName();
- }
-
@Override
public int hashCode() {
final int prime = 31;