You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/01/17 00:33:01 UTC

[accumulo] branch master updated: Fixes #1449 join closer thread in transport pool (#1477)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5169e78  Fixes #1449 join closer thread in transport pool (#1477)
5169e78 is described below

commit 5169e7872f32752c186b3c4dedd64825c6720a79
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 16 19:32:54 2020 -0500

    Fixes #1449 join closer thread in transport pool (#1477)
---
 .../core/clientImpl/ThriftTransportPool.java       | 24 +++++++------
 .../java/org/apache/accumulo/core/util/Once.java   | 41 ----------------------
 2 files changed, 13 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index 78ef155..8d51e85 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -41,7 +41,6 @@ import org.apache.accumulo.core.singletons.SingletonManager;
 import org.apache.accumulo.core.singletons.SingletonService;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.Once;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -50,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
@@ -346,8 +346,10 @@ public class ThriftTransportPool {
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
 
-  private Once checkStarter = new Once(() -> {
-    new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+  private Supplier<Daemon> checkThreadFactory = Suppliers.memoize(() -> {
+    var thread = new Daemon(new Closer(), "Thrift Connection Pool Checker");
+    thread.start();
+    return thread;
   });
 
   private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
@@ -384,16 +386,11 @@ public class ThriftTransportPool {
     private static final long serialVersionUID = 1L;
   }
 
-  private static class Closer implements Runnable {
-    final ThriftTransportPool pool;
-
-    public Closer(ThriftTransportPool pool) {
-      this.pool = pool;
-    }
+  private class Closer implements Runnable {
 
     private void closeConnections() throws InterruptedException {
       while (true) {
-        pool.closeExpiredConnections();
+        closeExpiredConnections();
         Thread.sleep(500);
       }
     }
@@ -857,7 +854,7 @@ public class ThriftTransportPool {
   }
 
   public void startCheckerThread() {
-    checkStarter.run();
+    checkThreadFactory.get();
   }
 
   void closeExpiredConnections() {
@@ -884,6 +881,11 @@ public class ThriftTransportPool {
 
   private void shutdown() {
     connectionPool.shutdown();
+    try {
+      checkThreadFactory.get().join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private ConnectionPool getConnectionPool() {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Once.java b/core/src/main/java/org/apache/accumulo/core/util/Once.java
deleted file mode 100644
index ffba095..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/Once.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Once implements Runnable {
-
-  private final AtomicBoolean hasRun = new AtomicBoolean(false);
-  private final Runnable action;
-
-  public Once(Runnable action) {
-    this.action = action;
-  }
-
-  @Override
-  public void run() {
-    if (hasRun.get())
-      return;
-
-    if (hasRun.compareAndSet(false, true)) {
-      action.run();
-    }
-  }
-}