You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2019/02/26 23:16:45 UTC

[hadoop] branch trunk updated: HADOOP-16127. In ipc.Client, put a new connection could happen after stop.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9192f71  HADOOP-16127. In ipc.Client, put a new connection could happen after stop.
9192f71 is described below

commit 9192f71e21847ad86bc9ff23847d8957dfe8ae58
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Feb 26 15:14:21 2019 -0800

    HADOOP-16127. In ipc.Client, put a new connection could happen after stop.
---
 .../main/java/org/apache/hadoop/ipc/Client.java    | 122 ++++++++++++---------
 .../java/org/apache/hadoop/ipc/ClientCache.java    |   7 +-
 2 files changed, 73 insertions(+), 56 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2219dec..0121967 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
 import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@@ -84,9 +85,7 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
 @Public
 @InterfaceStability.Evolving
 public class Client implements AutoCloseable {
-  
   public static final Logger LOG = LoggerFactory.getLogger(Client.class);
-  private static final int STOP_SLEEP_TIME_MS = 10;
 
   /** A counter for generating call IDs. */
   private static final AtomicInteger callIdCounter = new AtomicInteger();
@@ -124,15 +123,17 @@ public class Client implements AutoCloseable {
     EXTERNAL_CALL_HANDLER.set(externalHandler);
   }
 
-  private ConcurrentMap<ConnectionId, Connection> connections =
+  private final ConcurrentMap<ConnectionId, Connection> connections =
       new ConcurrentHashMap<>();
+  private final Object putLock = new Object();
+  private final Object emptyCondition = new Object();
+  private final AtomicBoolean running = new AtomicBoolean(true);
 
   private Class<? extends Writable> valueClass;   // class of call values
-  private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
 
   private SocketFactory socketFactory;           // how to create sockets
-  private int refCount = 1;
+  private final AtomicInteger refCount = new AtomicInteger(1);
 
   private final int connectionTimeout;
 
@@ -207,7 +208,7 @@ public class Client implements AutoCloseable {
       
       return clientExecutor;
     }
-  };
+  }
   
   /**
    * set the ping interval value in configuration
@@ -281,29 +282,19 @@ public class Client implements AutoCloseable {
   public static final ExecutorService getClientExecutor() {
     return Client.clientExcecutorFactory.clientExecutor;
   }
+
   /**
    * Increment this client's reference count
-   *
    */
-  synchronized void incCount() {
-    refCount++;
+  void incCount() {
+    refCount.incrementAndGet();
   }
   
   /**
    * Decrement this client's reference count
-   *
    */
-  synchronized void decCount() {
-    refCount--;
-  }
-  
-  /**
-   * Return if this client has no reference
-   * 
-   * @return true if this client has no reference; false otherwise
-   */
-  synchronized boolean isZeroReference() {
-    return refCount==0;
+  int decAndGetCount() {
+    return refCount.decrementAndGet();
   }
 
   /** Check the rpc response header. */
@@ -452,17 +443,13 @@ public class Client implements AutoCloseable {
     private final Object sendRpcRequestLock = new Object();
 
     private AtomicReference<Thread> connectingThread = new AtomicReference<>();
+    private final Consumer<Connection> removeMethod;
 
-    public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
+    Connection(ConnectionId remoteId, int serviceClass,
+        Consumer<Connection> removeMethod) {
       this.remoteId = remoteId;
       this.server = remoteId.getAddress();
-      if (server.isUnresolved()) {
-        throw NetUtils.wrapException(server.getHostName(),
-            server.getPort(),
-            null,
-            0,
-            new UnknownHostException());
-      }
+
       this.maxResponseLength = remoteId.conf.getInt(
           CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
           CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
@@ -481,7 +468,12 @@ public class Client implements AutoCloseable {
             .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
                 OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
                 RpcConstants.INVALID_RETRY_COUNT, clientId);
-        pingHeader.writeDelimitedTo(buf);
+        try {
+          pingHeader.writeDelimitedTo(buf);
+        } catch (IOException e) {
+          throw new IllegalStateException("Failed to write to buf for "
+              + remoteId + " in " + Client.this + " due to " + e, e);
+        }
         pingRequest = buf.toByteArray();
       }
       this.pingInterval = remoteId.getPingInterval();
@@ -494,6 +486,8 @@ public class Client implements AutoCloseable {
         this.soTimeout = pingInterval;
       }
       this.serviceClass = serviceClass;
+      this.removeMethod = removeMethod;
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("The ping interval is " + this.pingInterval + " ms.");
       }
@@ -1253,7 +1247,7 @@ public class Client implements AutoCloseable {
       // We have marked this connection as closed. Other thread could have
       // already known it and replace this closedConnection with a new one.
       // We should only remove this closedConnection.
-      connections.remove(remoteId, this);
+      removeMethod.accept(this);
 
       // close the streams and therefore the socket
       IOUtils.closeStream(ipcStreams);
@@ -1325,7 +1319,13 @@ public class Client implements AutoCloseable {
   public Client(Class<? extends Writable> valueClass, Configuration conf) {
     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
   }
- 
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "-"
+        + StringUtils.byteToHexString(clientId);
+  }
+
   /** Return the socket factory of this client
    *
    * @return this client's socket factory
@@ -1340,11 +1340,12 @@ public class Client implements AutoCloseable {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Stopping client");
     }
-
-    if (!running.compareAndSet(true, false)) {
-      return;
+    synchronized (putLock) { // synchronized to avoid put after stop
+      if (!running.compareAndSet(true, false)) {
+        return;
+      }
     }
-    
+
     // wake up all connections
     for (Connection conn : connections.values()) {
       conn.interrupt();
@@ -1352,13 +1353,15 @@ public class Client implements AutoCloseable {
     }
     
     // wait until all connections are closed
-    while (!connections.isEmpty()) {
-      try {
-        Thread.sleep(STOP_SLEEP_TIME_MS);
-      } catch (InterruptedException e) {
+    synchronized (emptyCondition) {
+      // synchronized the loop to guarantee wait must be notified.
+      while (!connections.isEmpty()) {
+        try {
+          emptyCondition.wait();
+        } catch (InterruptedException e) {
+        }
       }
     }
-    
     clientExcecutorFactory.unrefAndCleanup();
   }
 
@@ -1569,24 +1572,37 @@ public class Client implements AutoCloseable {
   private Connection getConnection(ConnectionId remoteId,
       Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
-    if (!running.get()) {
-      // the client is stopped
-      throw new IOException("The client is stopped");
-    }
+    final InetSocketAddress address = remoteId.getAddress();
+    if (address.isUnresolved()) {
+      throw NetUtils.wrapException(address.getHostName(),
+          address.getPort(),
+          null,
+          0,
+          new UnknownHostException());
+    }
+
+    final Consumer<Connection> removeMethod = c -> {
+      final boolean removed = connections.remove(remoteId, c);
+      if (removed && connections.isEmpty()) {
+        synchronized (emptyCondition) {
+          emptyCondition.notify();
+        }
+      }
+    };
+
     Connection connection;
     /* we could avoid this allocation for each RPC by having a  
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
     while (true) {
-      // These lines below can be shorten with computeIfAbsent in Java8
-      connection = connections.get(remoteId);
-      if (connection == null) {
-        connection = new Connection(remoteId, serviceClass);
-        Connection existing = connections.putIfAbsent(remoteId, connection);
-        if (existing != null) {
-          connection = existing;
+      synchronized (putLock) { // synchronized to avoid put after stop
+        if (!running.get()) {
+          throw new IOException("Failed to get connection for " + remoteId
+              + ", " + call + ": " + this + " is already stopped");
         }
+        connection = connections.computeIfAbsent(remoteId,
+            id -> new Connection(id, serviceClass, removeMethod));
       }
 
       if (connection.addCall(call)) {
@@ -1596,7 +1612,7 @@ public class Client implements AutoCloseable {
         // have already known this closedConnection, and replace it with a new
         // connection. So we should call conditional remove to make sure we only
         // remove this closedConnection.
-        connections.remove(remoteId, connection);
+        removeMethod.accept(connection);
       }
     }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
index 00d9a79..a0720d4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
@@ -96,16 +96,17 @@ public class ClientCache {
     if (Client.LOG.isDebugEnabled()) {
       Client.LOG.debug("stopping client from cache: " + client);
     }
+    final int count;
     synchronized (this) {
-      client.decCount();
-      if (client.isZeroReference()) {
+      count = client.decAndGetCount();
+      if (count == 0) {
         if (Client.LOG.isDebugEnabled()) {
           Client.LOG.debug("removing client from cache: " + client);
         }
         clients.remove(client.getSocketFactory());
       }
     }
-    if (client.isZeroReference()) {
+    if (count == 0) {
       if (Client.LOG.isDebugEnabled()) {
         Client.LOG.debug("stopping actual client because no more references remain: "
             + client);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org