You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/04/09 02:59:17 UTC
[hadoop] branch trunk updated: MAPREDUCE-7329: HadoopPipes task may
fail when linux kernel version change from 3.x to 4.x (#2775)
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 663ca14 MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)
663ca14 is described below
commit 663ca14a769bd8fa124c1aff4ac6630491dbb425
Author: lichaojacobs <li...@tju.edu.cn>
AuthorDate: Fri Apr 9 10:58:53 2021 +0800
MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)
---
.../apache/hadoop/mapred/pipes/Application.java | 52 ++++++++++++++
.../hadoop/mapred/pipes/TestPipeApplication.java | 83 ++++++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
index 83d2509..5416d26 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
@@ -30,12 +30,14 @@ import java.util.Random;
import javax.crypto.SecretKey;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
private static final Logger LOG =
LoggerFactory.getLogger(Application.class.getName());
private ServerSocket serverSocket;
+ private PingSocketCleaner socketCleaner;
private Process process;
private Socket clientSocket;
private OutputHandler<K2, V2> handler;
@@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
+ // start ping socket cleaner
+ int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
+ CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
+ socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
+ soTimeout);
+ socketCleaner.setDaemon(true);
+ socketCleaner.start();
String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge);
@@ -237,6 +248,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
serverSocket.close();
try {
downlink.close();
+ socketCleaner.interrupt();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
@@ -266,4 +278,44 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
return SecureShuffleUtils.hashFromString(data, key);
}
+ @VisibleForTesting
+ public static class PingSocketCleaner extends Thread {
+ private final ServerSocket serverSocket;
+ private final int soTimeout;
+
+ PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
+ super(name);
+ this.serverSocket = serverSocket;
+ this.soTimeout = soTimeout;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("PingSocketCleaner started...");
+ while (!Thread.currentThread().isInterrupted()) {
+ Socket clientSocket = null;
+ try {
+ clientSocket = serverSocket.accept();
+ clientSocket.setSoTimeout(soTimeout);
+ LOG.debug("Connection received from {}",
+ clientSocket.getInetAddress());
+ int readData = 0;
+ while (readData != -1) {
+ readData = clientSocket.getInputStream().read();
+ }
+ LOG.debug("close socket cause client has closed.");
+ closeSocketInternal(clientSocket);
+ } catch (IOException exception) {
+ LOG.error("PingSocketCleaner exception", exception);
+ } finally {
+ closeSocketInternal(clientSocket);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected void closeSocketInternal(Socket clientSocket) {
+ IOUtils.closeSocket(clientSocket);
+ }
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
index f0b383a..79c0bc1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
@@ -28,12 +28,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsConstants;
@@ -59,7 +62,9 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -455,6 +460,84 @@ public class TestPipeApplication {
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
}
+ @Test
+ public void testSocketCleaner() throws Exception {
+ ServerSocket serverSocket = setupServerSocket();
+ SocketCleaner cleaner = setupCleaner(serverSocket);
+ // mock ping thread, connect to server socket per second.
+ int expectedClosedCount = 5;
+ for (int i = 0; i < expectedClosedCount; i++) {
+ try {
+ Thread.sleep(1000);
+ Socket clientSocket = new Socket(serverSocket.getInetAddress(),
+ serverSocket.getLocalPort());
+ clientSocket.close();
+ } catch (Exception exception) {
+ // ignored...
+ exception.printStackTrace();
+ }
+ }
+ GenericTestUtils.waitFor(
+ () -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
+ }
+
+ @Test
+ public void testSocketTimeout() throws Exception {
+ ServerSocket serverSocket = setupServerSocket();
+ SocketCleaner cleaner = setupCleaner(serverSocket, 100);
+ try {
+ new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+ Thread.sleep(1000);
+ } catch (Exception exception) {
+ // ignored...
+ }
+ GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
+ 5000);
+ }
+
+ private SocketCleaner setupCleaner(ServerSocket serverSocket) {
+ return setupCleaner(serverSocket,
+ CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
+ }
+
+ private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
+ // start socket cleaner.
+ SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
+ serverSocket, soTimeout);
+ cleaner.setDaemon(true);
+ cleaner.start();
+
+ return cleaner;
+ }
+
+ private static class SocketCleaner extends PingSocketCleaner {
+ private int closeSocketCount = 0;
+
+ SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
+ super(name, serverSocket, soTimeout);
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ }
+
+ protected void closeSocketInternal(Socket clientSocket) {
+ if (!clientSocket.isClosed()) {
+ closeSocketCount++;
+ }
+ super.closeSocketInternal(clientSocket);
+ }
+
+ public int getCloseSocketCount() {
+ return closeSocketCount;
+ }
+ }
+
+ private ServerSocket setupServerSocket() throws Exception {
+ return new ServerSocket(0, 1);
+ }
+
/**
* clean previous std error and outs
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org