You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/01/17 00:33:01 UTC
[accumulo] branch master updated: Fixes #1449 join closer thread in
transport pool (#1477)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 5169e78 Fixes #1449 join closer thread in transport pool (#1477)
5169e78 is described below
commit 5169e7872f32752c186b3c4dedd64825c6720a79
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 16 19:32:54 2020 -0500
Fixes #1449 join closer thread in transport pool (#1477)
---
.../core/clientImpl/ThriftTransportPool.java | 24 +++++++------
.../java/org/apache/accumulo/core/util/Once.java | 41 ----------------------
2 files changed, 13 insertions(+), 52 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index 78ef155..8d51e85 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -41,7 +41,6 @@ import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonService;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.Once;
import org.apache.accumulo.core.util.Pair;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -50,6 +49,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -346,8 +346,10 @@ public class ThriftTransportPool {
private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
- private Once checkStarter = new Once(() -> {
- new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+ private Supplier<Daemon> checkThreadFactory = Suppliers.memoize(() -> {
+ var thread = new Daemon(new Closer(), "Thrift Connection Pool Checker");
+ thread.start();
+ return thread;
});
private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
@@ -384,16 +386,11 @@ public class ThriftTransportPool {
private static final long serialVersionUID = 1L;
}
- private static class Closer implements Runnable {
- final ThriftTransportPool pool;
-
- public Closer(ThriftTransportPool pool) {
- this.pool = pool;
- }
+ private class Closer implements Runnable {
private void closeConnections() throws InterruptedException {
while (true) {
- pool.closeExpiredConnections();
+ closeExpiredConnections();
Thread.sleep(500);
}
}
@@ -857,7 +854,7 @@ public class ThriftTransportPool {
}
public void startCheckerThread() {
- checkStarter.run();
+ checkThreadFactory.get();
}
void closeExpiredConnections() {
@@ -884,6 +881,11 @@ public class ThriftTransportPool {
private void shutdown() {
connectionPool.shutdown();
+ try {
+ checkThreadFactory.get().join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
private ConnectionPool getConnectionPool() {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Once.java b/core/src/main/java/org/apache/accumulo/core/util/Once.java
deleted file mode 100644
index ffba095..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/Once.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Once implements Runnable {
-
- private final AtomicBoolean hasRun = new AtomicBoolean(false);
- private final Runnable action;
-
- public Once(Runnable action) {
- this.action = action;
- }
-
- @Override
- public void run() {
- if (hasRun.get())
- return;
-
- if (hasRun.compareAndSet(false, true)) {
- action.run();
- }
- }
-}