You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/01 17:00:31 UTC
[incubator-pulsar] branch master updated: Fixed use of static
synchronized in LoadManagerShared (#1154)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69d2b66 Fixed use of static synchronized in LoadManagerShared (#1154)
69d2b66 is described below
commit 69d2b66c3c125216ba7bae64300b4224fec0c633
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 1 09:00:29 2018 -0800
Fixed use of static synchronized in LoadManagerShared (#1154)
---
.../broker/loadbalance/impl/LoadManagerShared.java | 62 +++++++++++++---------
.../broker/loadbalance/impl/OverloadShedder.java | 8 ++-
.../pulsar/broker/service/ReplicatorTest.java | 12 ++---
3 files changed, 50 insertions(+), 32 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index e0af8ec..2658a95 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -28,19 +28,17 @@ import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.pulsar.broker.BrokerData;
-import java.util.concurrent.TimeoutException;
+
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
@@ -57,6 +55,9 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.internal.PlatformDependent;
+
/**
* This class contains code which in shared between the two load manager implementations.
*/
@@ -67,11 +68,21 @@ public class LoadManagerShared {
public static final int MIBI = 1024 * 1024;
// Cache for primary brokers according to policies.
- private static final Set<String> primariesCache = new HashSet<>();
+ private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<Set<String>>() {
+ @Override
+ protected Set<String> initialValue() throws Exception {
+ return new HashSet<>();
+ }
+ };
// Cache for shard brokers according to policies.
- private static final Set<String> secondaryCache = new HashSet<>();
-
+ private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<Set<String>>() {
+ @Override
+ protected Set<String> initialValue() throws Exception {
+ return new HashSet<>();
+ }
+ };
+
// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
@@ -83,12 +94,15 @@ public class LoadManagerShared {
// Determines the brokers available for the given service unit according to the given policies.
// The brokers are put into brokerCandidateCache.
- public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit,
+ public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
- final Set<String> availableBrokers,
- final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+ final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+ Set<String> primariesCache = localPrimariesCache.get();
primariesCache.clear();
+
+ Set<String> secondaryCache = localSecondaryCache.get();
secondaryCache.clear();
+
NamespaceName namespace = serviceUnit.getNamespaceObject();
boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle)
@@ -141,7 +155,7 @@ public class LoadManagerShared {
}
} else if (!isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
- // persistent topic can be assigned to only brokers that enabled for persistent-topic
+ // persistent topic can be assigned to only brokers that enabled for persistent-topic
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
@@ -217,7 +231,7 @@ public class LoadManagerShared {
// Collect JVM direct memory
systemResourceUsage.directMemory.usage = (double) (JvmMetrics.getJvmDirectMemoryUsed() / MIBI);
- systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MIBI);
+ systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI);
return systemResourceUsage;
}
@@ -235,7 +249,7 @@ public class LoadManagerShared {
/**
* Removes the brokers which have more bundles assigned to them in the same namespace as the incoming bundle than at
* least one other available broker from consideration.
- *
+ *
* @param assignedBundleName
* Name of bundle to be assigned.
* @param candidates
@@ -280,7 +294,7 @@ public class LoadManagerShared {
.size() != finalLeastBundles);
}
}
-
+
/**
* It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
* domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
@@ -293,17 +307,17 @@ public class LoadManagerShared {
* d1-3 b1-2,b2-1
* d2-3 b3-2,b4-1
* d3-4 b5-2,b6-2
- *
+ *
* After filtering: "candidates" brokers
* Domain-count Brokers-count
* ____________ ____________
* d1-3 b2-1
* d2-3 b4-1
- *
+ *
* "candidate" broker to own anti-affinity-namespace = b2 or b4
- *
+ *
* </pre>
- *
+ *
* @param pulsar
* @param assignedBundleName
* @param candidates
@@ -366,7 +380,7 @@ public class LoadManagerShared {
/**
* It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
* namespaces more than this count.
- *
+ *
* @param brokerToAntiAffinityNamespaceCount
* @param candidates
* @param brokerToDomainMap
@@ -403,7 +417,7 @@ public class LoadManagerShared {
/**
* It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
* {@param namespaceName}
- *
+ *
* @param pulsar
* @param namespaceName
* @param brokerToNamespaceToBundleRange
@@ -451,12 +465,12 @@ public class LoadManagerShared {
}
/**
- *
+ *
* It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
* are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
* from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
* by different broker.
- *
+ *
* @param namespace
* @param bundle
* @param currentBroker
@@ -512,7 +526,7 @@ public class LoadManagerShared {
* It filters out brokers which owns topic higher than configured threshold at
* {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
* if all the brokers own topic higher than threshold then it resets the list with original broker candidates
- *
+ *
* @param brokerCandidateCache
* @param loadData
* @param loadBalancerBrokerMaxTopics
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 341136d..ed71def 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -45,7 +45,7 @@ public class OverloadShedder implements LoadSheddingStrategy {
/**
* Create an OverloadShedder with the service configuration.
- *
+ *
* @param conf
* Service configuration to create from.
*/
@@ -55,7 +55,7 @@ public class OverloadShedder implements LoadSheddingStrategy {
/**
* Attempt to shed one bundle off every broker which is overloaded.
- *
+ *
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
@@ -78,6 +78,10 @@ public class OverloadShedder implements LoadSheddingStrategy {
if (localData.getBundles().size() > 1) {
for (final String bundle : localData.getBundles()) {
final BundleData bundleData = loadData.getBundleData().get(bundle);
+ if (bundleData == null) {
+ continue;
+ }
+
// Consider short-term message rate to address system resource burden
final TimeAverageMessageData shortTermData = bundleData.getShortTermData();
final double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 25098a9..8397727 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -31,9 +31,9 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -43,12 +43,10 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
-import org.mockito.Mockito;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
-import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
@@ -56,17 +54,16 @@ import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.PersistentTopicStats;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -664,6 +661,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
+
+ producer1.close();
}
@Test(priority = 5, timeOut = 30000)
@@ -683,6 +682,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
field.setAccessible(true);
ProducerImpl producer = (ProducerImpl) field.get(replicator);
assertNull(producer);
+ producer1.close();
}
/**
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.