You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "steveloughran (via GitHub)" <gi...@apache.org> on 2023/02/08 11:12:17 UTC

[GitHub] [hadoop] steveloughran commented on a diff in pull request #5366: HDFS-16853. IPC shutdown failures.

steveloughran commented on code in PR #5366:
URL: https://github.com/apache/hadoop/pull/5366#discussion_r1099987282


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-      rpcRequestQueue.put(Pair.of(call, buf));
+      queueIfActive(call, buf);
+    }
+
+    /**
+     * Queue an operation into the request queue,
+     * waiting if necessary for the queue to have a thread to process it.
+     * If the connection is closed, downgrades to a no-op
+     * @param call call to queue
+     * @param buf buffer for response
+     * @throws InterruptedException interrupted while waiting for a free thread.
+     */
+    private void queueIfActive(
+        final Call call,
+        final ResponseBuffer buf) throws InterruptedException {
+      // Get the request queue.
+      // done in a synchronized block to avoid a race condition where
+      // a call is queued after the connection has been closed
+      final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+          acquireActiveRequestQueue();
+      if (queue != null) {
+        try {
+          queue.put(Pair.of(call, buf));
+        } finally {
+          // release the reservation afterwards.
+          releaseQueueReservation();
+        }
+      } else {
+        LOG.debug("Discarding queued call as IPC client is stopped");
+      }
+    }
+
+    /**
+     * Get the active rpc request queue.
+     * If the connection is closed, returns null.
+     * This method is synchronized, as are the operations to set
+     * the {@link #shouldCloseConnection} and {@link #running}
+     * atomic booleans, therefore this entire method will complete in the
+     * same block. However, the returned queue may be used outside of
+     * a synchronous block, where this guarantee no longer holds.
+     * A queue reservation counter is used to track this.
+     * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+     * @return the queue or null.
+     */
+    private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>> acquireActiveRequestQueue() {
+      if (shouldCloseConnection.get() || !running.get()) {
+        LOG.debug("IPC client is stopped");

Review Comment:
   i was thinking that myself



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org