You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2019/02/26 23:16:45 UTC
[hadoop] branch trunk updated: HADOOP-16127. In ipc.Client,
put a new connection could happen after stop.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9192f71 HADOOP-16127. In ipc.Client, put a new connection could happen after stop.
9192f71 is described below
commit 9192f71e21847ad86bc9ff23847d8957dfe8ae58
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Feb 26 15:14:21 2019 -0800
HADOOP-16127. In ipc.Client, put a new connection could happen after stop.
---
.../main/java/org/apache/hadoop/ipc/Client.java | 122 ++++++++++++---------
.../java/org/apache/hadoop/ipc/ClientCache.java | 7 +-
2 files changed, 73 insertions(+), 56 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2219dec..0121967 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@@ -84,9 +85,7 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@Public
@InterfaceStability.Evolving
public class Client implements AutoCloseable {
-
public static final Logger LOG = LoggerFactory.getLogger(Client.class);
- private static final int STOP_SLEEP_TIME_MS = 10;
/** A counter for generating call IDs. */
private static final AtomicInteger callIdCounter = new AtomicInteger();
@@ -124,15 +123,17 @@ public class Client implements AutoCloseable {
EXTERNAL_CALL_HANDLER.set(externalHandler);
}
- private ConcurrentMap<ConnectionId, Connection> connections =
+ private final ConcurrentMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<>();
+ private final Object putLock = new Object();
+ private final Object emptyCondition = new Object();
+ private final AtomicBoolean running = new AtomicBoolean(true);
private Class<? extends Writable> valueClass; // class of call values
- private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
private SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
+ private final AtomicInteger refCount = new AtomicInteger(1);
private final int connectionTimeout;
@@ -207,7 +208,7 @@ public class Client implements AutoCloseable {
return clientExecutor;
}
- };
+ }
/**
* set the ping interval value in configuration
@@ -281,29 +282,19 @@ public class Client implements AutoCloseable {
public static final ExecutorService getClientExecutor() {
return Client.clientExcecutorFactory.clientExecutor;
}
+
/**
* Increment this client's reference count
- *
*/
- synchronized void incCount() {
- refCount++;
+ void incCount() {
+ refCount.incrementAndGet();
}
/**
* Decrement this client's reference count
- *
*/
- synchronized void decCount() {
- refCount--;
- }
-
- /**
- * Return if this client has no reference
- *
- * @return true if this client has no reference; false otherwise
- */
- synchronized boolean isZeroReference() {
- return refCount==0;
+ int decAndGetCount() {
+ return refCount.decrementAndGet();
}
/** Check the rpc response header. */
@@ -452,17 +443,13 @@ public class Client implements AutoCloseable {
private final Object sendRpcRequestLock = new Object();
private AtomicReference<Thread> connectingThread = new AtomicReference<>();
+ private final Consumer<Connection> removeMethod;
- public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
+ Connection(ConnectionId remoteId, int serviceClass,
+ Consumer<Connection> removeMethod) {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
- if (server.isUnresolved()) {
- throw NetUtils.wrapException(server.getHostName(),
- server.getPort(),
- null,
- 0,
- new UnknownHostException());
- }
+
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
@@ -481,7 +468,12 @@ public class Client implements AutoCloseable {
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId);
- pingHeader.writeDelimitedTo(buf);
+ try {
+ pingHeader.writeDelimitedTo(buf);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to write to buf for "
+ + remoteId + " in " + Client.this + " due to " + e, e);
+ }
pingRequest = buf.toByteArray();
}
this.pingInterval = remoteId.getPingInterval();
@@ -494,6 +486,8 @@ public class Client implements AutoCloseable {
this.soTimeout = pingInterval;
}
this.serviceClass = serviceClass;
+ this.removeMethod = removeMethod;
+
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
}
@@ -1253,7 +1247,7 @@ public class Client implements AutoCloseable {
// We have marked this connection as closed. Other thread could have
// already known it and replace this closedConnection with a new one.
// We should only remove this closedConnection.
- connections.remove(remoteId, this);
+ removeMethod.accept(this);
// close the streams and therefore the socket
IOUtils.closeStream(ipcStreams);
@@ -1325,7 +1319,13 @@ public class Client implements AutoCloseable {
public Client(Class<? extends Writable> valueClass, Configuration conf) {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
-
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "-"
+ + StringUtils.byteToHexString(clientId);
+ }
+
/** Return the socket factory of this client
*
* @return this client's socket factory
@@ -1340,11 +1340,12 @@ public class Client implements AutoCloseable {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping client");
}
-
- if (!running.compareAndSet(true, false)) {
- return;
+ synchronized (putLock) { // synchronized to avoid put after stop
+ if (!running.compareAndSet(true, false)) {
+ return;
+ }
}
-
+
// wake up all connections
for (Connection conn : connections.values()) {
conn.interrupt();
@@ -1352,13 +1353,15 @@ public class Client implements AutoCloseable {
}
// wait until all connections are closed
- while (!connections.isEmpty()) {
- try {
- Thread.sleep(STOP_SLEEP_TIME_MS);
- } catch (InterruptedException e) {
+ synchronized (emptyCondition) {
+ // synchronized the loop to guarantee wait must be notified.
+ while (!connections.isEmpty()) {
+ try {
+ emptyCondition.wait();
+ } catch (InterruptedException e) {
+ }
}
}
-
clientExcecutorFactory.unrefAndCleanup();
}
@@ -1569,24 +1572,37 @@ public class Client implements AutoCloseable {
private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
- if (!running.get()) {
- // the client is stopped
- throw new IOException("The client is stopped");
- }
+ final InetSocketAddress address = remoteId.getAddress();
+ if (address.isUnresolved()) {
+ throw NetUtils.wrapException(address.getHostName(),
+ address.getPort(),
+ null,
+ 0,
+ new UnknownHostException());
+ }
+
+ final Consumer<Connection> removeMethod = c -> {
+ final boolean removed = connections.remove(remoteId, c);
+ if (removed && connections.isEmpty()) {
+ synchronized (emptyCondition) {
+ emptyCondition.notify();
+ }
+ }
+ };
+
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
while (true) {
- // These lines below can be shorten with computeIfAbsent in Java8
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId, serviceClass);
- Connection existing = connections.putIfAbsent(remoteId, connection);
- if (existing != null) {
- connection = existing;
+ synchronized (putLock) { // synchronized to avoid put after stop
+ if (!running.get()) {
+ throw new IOException("Failed to get connection for " + remoteId
+ + ", " + call + ": " + this + " is already stopped");
}
+ connection = connections.computeIfAbsent(remoteId,
+ id -> new Connection(id, serviceClass, removeMethod));
}
if (connection.addCall(call)) {
@@ -1596,7 +1612,7 @@ public class Client implements AutoCloseable {
// have already known this closedConnection, and replace it with a new
// connection. So we should call conditional remove to make sure we only
// remove this closedConnection.
- connections.remove(remoteId, connection);
+ removeMethod.accept(connection);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
index 00d9a79..a0720d4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java
@@ -96,16 +96,17 @@ public class ClientCache {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping client from cache: " + client);
}
+ final int count;
synchronized (this) {
- client.decCount();
- if (client.isZeroReference()) {
+ count = client.decAndGetCount();
+ if (count == 0) {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("removing client from cache: " + client);
}
clients.remove(client.getSocketFactory());
}
}
- if (client.isZeroReference()) {
+ if (count == 0) {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping actual client because no more references remain: "
+ client);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org