You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "virajjasani (via GitHub)" <gi...@apache.org> on 2023/02/08 00:01:40 UTC

[GitHub] [hadoop] virajjasani commented on a diff in pull request #5366: HDFS-16853. IPC shutdown failures.

virajjasani commented on code in PR #5366:
URL: https://github.com/apache/hadoop/pull/5366#discussion_r1099397087


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-      rpcRequestQueue.put(Pair.of(call, buf));
+      queueIfActive(call, buf);
+    }
+
+    /**
+     * Queue an operation into the request queue,
+     * waiting if necessary for the queue to have a thread to process it.
+     * If the connection is closed, downgrades to a no-op
+     * @param call call to queue
+     * @param buf buffer for response
+     * @throws InterruptedException interrupted while waiting for a free thread.
+     */
+    private void queueIfActive(
+        final Call call,
+        final ResponseBuffer buf) throws InterruptedException {
+      // Get the request queue.
+      // done in a synchronized block to avoid a race condition where
+      // a call is queued after the connection has been closed
+      final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+          acquireActiveRequestQueue();
+      if (queue != null) {
+        try {
+          queue.put(Pair.of(call, buf));
+        } finally {
+          // release the reservation afterwards.
+          releaseQueueReservation();
+        }
+      } else {
+        LOG.debug("Discarding queued call as IPC client is stopped");
+      }
+    }
+
+    /**
+     * Get the active rpc request queue.
+     * If the connection is closed, returns null.
+     * This method is synchronized, as are the operations to set
+     * the {@link #shouldCloseConnection} and {@link #running}
+     * atomic booleans, therefore this entire method will complete in the
+     * same block. However, the returned queue may be used outside of
+     * a synchronous block, where this guarantee no longer holds.
+     * A queue reservation counter is used to track this.
+     * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+     * @return the queue or null.
+     */
+    private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>> acquireActiveRequestQueue() {
+      if (shouldCloseConnection.get() || !running.get()) {
+        LOG.debug("IPC client is stopped");

Review Comment:
   nit: `LOG.debug("IPC client {} is stopped", this)` would be great



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1241,9 +1301,28 @@ private void receiveRpcResponse() {
         markClosed(e);
       }
     }
-    
+
+    /**
+     * Mark the connection as closed due to an exception.
+     * Sets the {@link #shouldCloseConnection} boolean to true,
+     * and, if it was false earlier, drains the queue before
+     * notifying any waiting objects.
+     * @param e exception which triggered the closure; may be null.
+     */
     private synchronized void markClosed(IOException e) {
       if (shouldCloseConnection.compareAndSet(false, true)) {
+        Pair<Call, ResponseBuffer> request;
+        while ((request = rpcRequestQueue.poll()) != null) {
+          LOG.debug("Clean {} from RpcRequestQueue.", request.getLeft());
+        }
+        if (queueReservations.get() > 0) {
+          // there's still an active reservation.
+          // either a new put() is about to happen (bad),
+          // or it has happened but the finally {} clause has not been invoked (good).
+          // without knowing which, print a warning message so at least logs on
+          // a deadlock are meaningful.
+          LOG.warn("Possible overlap in queue shutdown and request");

Review Comment:
   We can also print `queueReservations.get()` with this log



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-      rpcRequestQueue.put(Pair.of(call, buf));
+      queueIfActive(call, buf);
+    }
+
+    /**
+     * Queue an operation into the request queue,
+     * waiting if necessary for the queue to have a thread to process it.
+     * If the connection is closed, downgrades to a no-op
+     * @param call call to queue
+     * @param buf buffer for response
+     * @throws InterruptedException interrupted while waiting for a free thread.
+     */
+    private void queueIfActive(
+        final Call call,
+        final ResponseBuffer buf) throws InterruptedException {
+      // Get the request queue.
+      // done in a synchronized block to avoid a race condition where
+      // a call is queued after the connection has been closed
+      final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+          acquireActiveRequestQueue();
+      if (queue != null) {
+        try {
+          queue.put(Pair.of(call, buf));
+        } finally {
+          // release the reservation afterwards.
+          releaseQueueReservation();
+        }
+      } else {
+        LOG.debug("Discarding queued call as IPC client is stopped");
+      }
+    }
+
+    /**
+     * Get the active rpc request queue.
+     * If the connection is closed, returns null.
+     * This method is synchronized, as are the operations to set
+     * the {@link #shouldCloseConnection} and {@link #running}
+     * atomic booleans, therefore this entire method will complete in the
+     * same block. However, the returned queue may be used outside of
+     * a synchronous block, where this guarantee no longer holds.
+     * A queue reservation counter is used to track this.
+     * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+     * @return the queue or null.
+     */
+    private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>> acquireActiveRequestQueue() {
+      if (shouldCloseConnection.get() || !running.get()) {

Review Comment:
   We do not need additional synchronization on `putLock` object while accessing `running` correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org