You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/07/10 22:05:02 UTC
[accumulo] branch 1.9 updated: Fix multiple concurrency bugs in
Master.gatherTableInformation() (#546)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push:
new 42a3534 Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
42a3534 is described below
commit 42a3534ebcd1cd27df863f10487708d4648ab03a
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue Jul 10 18:04:59 2018 -0400
Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
Master.gatherTableInformation() had the following problems :
* If Property.MASTER_STATUS_THREAD_POOL_SIZE set > 1, then multiple threads
would put into a TreeMap
* Create a thread pool and never called shutdown now
* Returns a reference to a treemap that threads in thread pool may still
be adding to.
This patch also attempts to address the issues brought up in #402 by switching
to a cached thread pool. This will allow the thread pool to expand so that
unresponsive tservers do not prevent gathering status from responsive tservers.
---
.../org/apache/accumulo/core/conf/Property.java | 5 +--
.../java/org/apache/accumulo/master/Master.java | 36 ++++++++++++++++------
2 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1eed867..6a2024e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -340,8 +340,9 @@ public enum Property {
MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
"5s", PropertyType.TIMEDURATION,
"The time between adjustments of the coordinator thread pool"),
- MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", PropertyType.COUNT,
- "The number of threads to use when fetching the tablet server status for balancing."),
+ MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "0", PropertyType.COUNT,
+ "The number of threads to use when fetching the tablet server status for balancing. Zero "
+ + "indicates an unlimited number of threads will be used."),
MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", PropertyType.BOOLEAN,
"Allow tablets for the " + MetadataTable.NAME
+ " table to be suspended via table.suspend.duration."),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9414b98..2f124e3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -163,6 +164,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
/**
@@ -1065,7 +1067,7 @@ public class Master extends AccumuloServerContext
private long updateStatus()
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
- tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation(currentServers));
+ tserverStatus = gatherTableInformation(currentServers);
checkForHeldServer(tserverStatus);
if (!badServers.isEmpty()) {
@@ -1146,12 +1148,20 @@ public class Master extends AccumuloServerContext
private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
Set<TServerInstance> currentServers) {
+ final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+ int threads = getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE);
+ ExecutorService tp = threads == 0 ? Executors.newCachedThreadPool()
+ : Executors.newFixedThreadPool(threads);
long start = System.currentTimeMillis();
- int threads = Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 1);
- ExecutorService tp = Executors.newFixedThreadPool(threads);
- final SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<>();
+ final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>();
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
+ if (threads == 0) {
+ // Since an unbounded thread pool is being used, rate limit how fast task are added to the
+ // executor. This prevents the threads from growing large unless there are lots of
+ // unresponsive tservers.
+ sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
+ }
tp.submit(new Runnable() {
@Override
public void run() {
@@ -1191,18 +1201,24 @@ public class Master extends AccumuloServerContext
}
tp.shutdown();
try {
- tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT) * 2,
- TimeUnit.MILLISECONDS);
+ tp.awaitTermination(Math.max(10000, rpcTimeout / 3), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.debug("Interrupted while fetching status");
}
+
+ tp.shutdownNow();
+
+ // Threads may still modify map after shutdownNow is called, so create an immutable snapshot.
+ SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result);
+
synchronized (badServers) {
badServers.keySet().retainAll(currentServers);
- badServers.keySet().removeAll(result.keySet());
+ badServers.keySet().removeAll(info.keySet());
}
- log.debug(String.format("Finished gathering information from %d servers in %.2f seconds",
- result.size(), (System.currentTimeMillis() - start) / 1000.));
- return result;
+ log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds",
+ info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));
+
+ return info;
}
public void run() throws IOException, InterruptedException, KeeperException {