You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/01 17:00:34 UTC

[GitHub] merlimat closed pull request #1154: Fixed use of static synchronized in LoadManagerShared

merlimat closed pull request #1154: Fixed use of static synchronized in LoadManagerShared
URL: https://github.com/apache/incubator-pulsar/pull/1154
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index e0af8ec0d..2658a95fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -28,19 +28,17 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import org.apache.pulsar.broker.BrokerData;
-import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
@@ -57,6 +55,9 @@
 import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.Maps;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.internal.PlatformDependent;
+
 /**
  * This class contains code which in shared between the two load manager implementations.
  */
@@ -67,11 +68,21 @@
     public static final int MIBI = 1024 * 1024;
 
     // Cache for primary brokers according to policies.
-    private static final Set<String> primariesCache = new HashSet<>();
+    private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<Set<String>>() {
+        @Override
+        protected Set<String> initialValue() throws Exception {
+            return new HashSet<>();
+        }
+    };
 
     // Cache for shard brokers according to policies.
-    private static final Set<String> secondaryCache = new HashSet<>();
-    
+    private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<Set<String>>() {
+        @Override
+        protected Set<String> initialValue() throws Exception {
+            return new HashSet<>();
+        }
+    };
+
     // update LoadReport at most every 5 seconds
     public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
 
@@ -83,12 +94,15 @@ private LoadManagerShared() {
 
     // Determines the brokers available for the given service unit according to the given policies.
     // The brokers are put into brokerCandidateCache.
-    public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit,
+    public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
-            final Set<String> availableBrokers,
-            final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+            final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+        Set<String> primariesCache = localPrimariesCache.get();
         primariesCache.clear();
+
+        Set<String> secondaryCache = localSecondaryCache.get();
         secondaryCache.clear();
+
         NamespaceName namespace = serviceUnit.getNamespaceObject();
         boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
         boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle)
@@ -141,7 +155,7 @@ public static synchronized void applyNamespacePolicies(final ServiceUnitId servi
                     }
                 } else if (!isNonPersistentTopic
                         && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
-                    // persistent topic can be assigned to only brokers that enabled for persistent-topic 
+                    // persistent topic can be assigned to only brokers that enabled for persistent-topic
                     if (log.isDebugEnabled()) {
                         log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
                                 brokerUrl.getHost(), namespace.toString());
@@ -217,7 +231,7 @@ public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage b
 
         // Collect JVM direct memory
         systemResourceUsage.directMemory.usage = (double) (JvmMetrics.getJvmDirectMemoryUsed() / MIBI);
-        systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MIBI);
+        systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI);
 
         return systemResourceUsage;
     }
@@ -235,7 +249,7 @@ public static boolean isLoadSheddingEnabled(final PulsarService pulsar) {
     /**
      * Removes the brokers which have more bundles assigned to them in the same namespace as the incoming bundle than at
      * least one other available broker from consideration.
-     * 
+     *
      * @param assignedBundleName
      *            Name of bundle to be assigned.
      * @param candidates
@@ -280,7 +294,7 @@ public static void removeMostServicingBrokersForNamespace(final String assignedB
                     .size() != finalLeastBundles);
         }
     }
-    
+
     /**
      * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
      * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
@@ -293,17 +307,17 @@ public static void removeMostServicingBrokersForNamespace(final String assignedB
      * d1-3          b1-2,b2-1
      * d2-3          b3-2,b4-1
      * d3-4          b5-2,b6-2
-     * 
+     *
      * After filtering: "candidates" brokers
      * Domain-count  Brokers-count
      * ____________  ____________
      * d1-3          b2-1
      * d2-3          b4-1
-     * 
+     *
      * "candidate" broker to own anti-affinity-namespace = b2 or b4
-     * 
+     *
      * </pre>
-     * 
+     *
      * @param pulsar
      * @param assignedBundleName
      * @param candidates
@@ -366,7 +380,7 @@ public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsa
     /**
      * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
      * namespaces more than this count.
-     * 
+     *
      * @param brokerToAntiAffinityNamespaceCount
      * @param candidates
      * @param brokerToDomainMap
@@ -403,7 +417,7 @@ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
     /**
      * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
      * {@param namespaceName}
-     * 
+     *
      * @param pulsar
      * @param namespaceName
      * @param brokerToNamespaceToBundleRange
@@ -451,12 +465,12 @@ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
     }
 
     /**
-     * 
+     *
      * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
      * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
      * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
      * by different broker.
-     * 
+     *
      * @param namespace
      * @param bundle
      * @param currentBroker
@@ -512,7 +526,7 @@ public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String
      * It filters out brokers which owns topic higher than configured threshold at
      * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
      * if all the brokers own topic higher than threshold then it resets the list with original broker candidates
-     * 
+     *
      * @param brokerCandidateCache
      * @param loadData
      * @param loadBalancerBrokerMaxTopics
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 341136dc4..ed71defd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -45,7 +45,7 @@
 
     /**
      * Create an OverloadShedder with the service configuration.
-     * 
+     *
      * @param conf
      *            Service configuration to create from.
      */
@@ -55,7 +55,7 @@ public OverloadShedder(final ServiceConfiguration conf) {
 
     /**
      * Attempt to shed one bundle off every broker which is overloaded.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
@@ -78,6 +78,10 @@ public OverloadShedder(final ServiceConfiguration conf) {
                 if (localData.getBundles().size() > 1) {
                     for (final String bundle : localData.getBundles()) {
                         final BundleData bundleData = loadData.getBundleData().get(bundle);
+                        if (bundleData == null) {
+                            continue;
+                        }
+
                         // Consider short-term message rate to address system resource burden
                         final TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                         final double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 25098a94a..8397727c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -31,9 +31,9 @@
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -43,12 +43,10 @@
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
-import org.mockito.Mockito;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
-import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
@@ -56,17 +54,16 @@
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.PersistentTopicStats;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -664,6 +661,8 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
             assertNull(topic.getPersistentReplicator(replicatorClusterName));
             return null;
         });
+
+        producer1.close();
     }
 
     @Test(priority = 5, timeOut = 30000)
@@ -683,6 +682,7 @@ public void testReplicatorProducerClosing() throws Exception {
         field.setAccessible(true);
         ProducerImpl producer = (ProducerImpl) field.get(replicator);
         assertNull(producer);
+        producer1.close();
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services