You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/04/09 03:11:08 UTC

[hadoop] branch branch-3.3 updated: MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 068f114  MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)
068f114 is described below

commit 068f114066f262d643b096302b47f5dc3dc3b278
Author: lichaojacobs <li...@tju.edu.cn>
AuthorDate: Fri Apr 9 10:58:53 2021 +0800

    MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)
    
    (cherry picked from commit 663ca14a769bd8fa124c1aff4ac6630491dbb425)
---
 .../apache/hadoop/mapred/pipes/Application.java    | 52 ++++++++++++++
 .../hadoop/mapred/pipes/TestPipeApplication.java   | 83 ++++++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
index 83d2509..5416d26 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
@@ -30,12 +30,14 @@ import java.util.Random;
 
 import javax.crypto.SecretKey;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
   private static final Logger LOG =
       LoggerFactory.getLogger(Application.class.getName());
   private ServerSocket serverSocket;
+  private PingSocketCleaner socketCleaner;
   private Process process;
   private Socket clientSocket;
   private OutputHandler<K2, V2> handler;
@@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
+    // start ping socket cleaner
+    int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
+        CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
+    socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
+                                          soTimeout);
+    socketCleaner.setDaemon(true);
+    socketCleaner.start();
     
     String challenge = getSecurityChallenge();
     String digestToSend = createDigest(password, challenge);
@@ -237,6 +248,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     serverSocket.close();
     try {
       downlink.close();
+      socketCleaner.interrupt();
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
     }      
@@ -266,4 +278,44 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     return SecureShuffleUtils.hashFromString(data, key);
   }
 
+  @VisibleForTesting
+  public static class PingSocketCleaner extends Thread {
+    private final ServerSocket serverSocket;
+    private final int soTimeout;
+
+    PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
+      super(name);
+      this.serverSocket = serverSocket;
+      this.soTimeout = soTimeout;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("PingSocketCleaner started...");
+      while (!Thread.currentThread().isInterrupted()) {
+        Socket clientSocket = null;
+        try {
+          clientSocket = serverSocket.accept();
+          clientSocket.setSoTimeout(soTimeout);
+          LOG.debug("Connection received from {}",
+                    clientSocket.getInetAddress());
+          int readData = 0;
+          while (readData != -1) {
+            readData = clientSocket.getInputStream().read();
+          }
+          LOG.debug("close socket cause client has closed.");
+          closeSocketInternal(clientSocket);
+        } catch (IOException exception) {
+          LOG.error("PingSocketCleaner exception", exception);
+        } finally {
+          closeSocketInternal(clientSocket);
+        }
+      }
+    }
+
+    @VisibleForTesting
+    protected void closeSocketInternal(Socket clientSocket) {
+      IOUtils.closeSocket(clientSocket);
+    }
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
index f0b383a..79c0bc1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
@@ -28,12 +28,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsConstants;
@@ -59,7 +62,9 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -455,6 +460,84 @@ public class TestPipeApplication {
     assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
   }
 
+  @Test
+  public void testSocketCleaner() throws Exception {
+    ServerSocket serverSocket = setupServerSocket();
+    SocketCleaner cleaner = setupCleaner(serverSocket);
+    // mock ping thread, connect to server socket per second.
+    int expectedClosedCount = 5;
+    for (int i = 0; i < expectedClosedCount; i++) {
+      try {
+        Thread.sleep(1000);
+        Socket clientSocket = new Socket(serverSocket.getInetAddress(),
+                                         serverSocket.getLocalPort());
+        clientSocket.close();
+      } catch (Exception exception) {
+        // ignored...
+        exception.printStackTrace();
+      }
+    }
+    GenericTestUtils.waitFor(
+        () -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
+  }
+
+  @Test
+  public void testSocketTimeout() throws Exception {
+    ServerSocket serverSocket = setupServerSocket();
+    SocketCleaner cleaner = setupCleaner(serverSocket, 100);
+    try {
+      new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+      Thread.sleep(1000);
+    } catch (Exception exception) {
+      // ignored...
+    }
+    GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
+        5000);
+  }
+
+  private SocketCleaner setupCleaner(ServerSocket serverSocket) {
+    return setupCleaner(serverSocket,
+                        CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
+  }
+
+  private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
+    // start socket cleaner.
+    SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
+                                              serverSocket, soTimeout);
+    cleaner.setDaemon(true);
+    cleaner.start();
+
+    return cleaner;
+  }
+
+  private static class SocketCleaner extends PingSocketCleaner {
+    private int closeSocketCount = 0;
+
+    SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
+      super(name, serverSocket, soTimeout);
+    }
+
+    @Override
+    public void run() {
+      super.run();
+    }
+
+    protected void closeSocketInternal(Socket clientSocket) {
+      if (!clientSocket.isClosed()) {
+        closeSocketCount++;
+      }
+      super.closeSocketInternal(clientSocket);
+    }
+
+    public int getCloseSocketCount() {
+      return closeSocketCount;
+    }
+  }
+
+  private ServerSocket setupServerSocket() throws Exception {
+    return new ServerSocket(0, 1);
+  }
+
   /**
    * clean previous std error and outs
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org