You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2019/11/21 21:53:39 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #1374: WIP: Give ThriftTransportPool higher concurrent granularity

keith-turner commented on a change in pull request #1374: WIP: Give ThriftTransportPool higher concurrent granularity
URL: https://github.com/apache/accumulo/pull/1374#discussion_r349299137
 
 

 ##########
 File path: core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
 ##########
 @@ -65,9 +68,167 @@ public CachedConnection reserveAny() {
       }
       return cachedConnection;
     }
+
+    private void removeExpiredConnections(final ArrayList<CachedConnection> expired, final long killTime) {
+      long currTime = System.currentTimeMillis();
+      while (isLastUnreservedExpired(currTime, killTime)) {
+        expired.add(unreserved.removeLast());
+      }
+    }
+
+    boolean isLastUnreservedExpired(final long currTime, final long killTime) {
+      return !unreserved.isEmpty() && (currTime - unreserved.peekLast().lastReturnTime) > killTime;
+    }
+
+    void checkReservedForStuckIO() {
+      reserved.values().forEach(c -> c.transport.checkForStuckIO(STUCK_THRESHOLD));
+    }
+
+    void closeAllTransports() {
+      closeTransports(unreserved);
+      closeTransports(reserved.values());
+    }
+
+    void closeTransports(final Iterable<CachedConnection> stream) {
+      stream.forEach((connection) -> {
+        try {
+          connection.transport.close();
+        } catch (Exception e) {
+          log.debug("Error closing transport during shutdown", e);
+        }
+      });
+    }
+
+    CachedConnection removeReserved(CachedTTransport transport) {
+      return reserved.remove(transport);
+    }
+  }
+
+  private static class ConnectionPool {
+    final ConcurrentHashMap<ThriftTransportKey,ReentrantLock> locks = new ConcurrentHashMap<>();
+    final ConcurrentHashMap<ThriftTransportKey,CachedConnections> connections = new ConcurrentHashMap<>();
+
+    Set<ThriftTransportKey> getThriftTransportKeys() {
+      return connections.keySet();
+    }
+
+    CachedConnection reserveAny(final ThriftTransportKey key) {
+      return executeWithinLock(key, () -> getOrCreateCachedConnections(key).reserveAny());
+    }
+
+    CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) {
+      return executeWithinLock(key, () -> connections.containsKey(key)
+          ? connections.get(key).reserveAny()
+          : null);
+    }
+
+    void putReserved(final ThriftTransportKey key, final CachedConnection connection) {
+      executeWithinLock(key, () -> getOrCreateCachedConnections(key).reserved.put(
+          connection.transport, connection));
+    }
+
+    boolean returnTransport(final CachedTTransport transport, final List<CachedConnection> toBeClosed) {
+      return executeWithinLock(transport.getCacheKey(), () -> unreserveConnection(transport, toBeClosed));
+    }
+
+    void shutdown() {
+      for (Entry<ThriftTransportKey,CachedConnections> entry : connections.entrySet()) {
+        Lock lock = locks.get(entry.getKey());
+        // Explicitly do not release the lock afterwards in order to prevent other threads from
+        // obtaining it during shutdown. This pool instance is expected to be nullified after this
+        // method is called.
+        lock.lock();
+
+        // Close the transports.
+        entry.getValue().closeAllTransports();
+      }
+    }
+
+    <T> T executeWithinLock(final ThriftTransportKey key, Supplier<T> function) {
+      Lock lock = getLock(key);
+      try {
 
 Review comment:
   Seems like this should acquire the lock.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services