You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/01/21 20:27:58 UTC
svn commit: r1560149 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/ap...
Author: jlowe
Date: Tue Jan 21 19:27:57 2014
New Revision: 1560149
URL: http://svn.apache.org/r1560149
Log:
svn merge -c 1560148 FIXES: MAPREDUCE-5693. Restore MRv1 behavior for log flush. Contributed by Gera Shegalov
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1560149&r1=1560148&r2=1560149&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Jan 21 19:27:57 2014
@@ -137,6 +137,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5729. mapred job -list throws NPE (kasha)
+ MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via
+ jlowe)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1560149&r1=1560148&r2=1560149&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue Jan 21 19:27:57 2014
@@ -27,6 +27,7 @@ import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.Applic
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.log4j.LogManager;
/**
* The main() for MapReduce task processes.
@@ -123,6 +123,7 @@ class YarnChild {
LOG.debug("PID: " + System.getenv().get("JVM_PID"));
Task task = null;
UserGroupInformation childUGI = null;
+ ScheduledExecutorService logSyncer = null;
try {
int idleLoopCount = 0;
@@ -155,6 +156,8 @@ class YarnChild {
// set job classloader if configured before invoking the task
MRApps.setJobClassLoader(job);
+ logSyncer = TaskLog.createLogSyncer();
+
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@@ -208,10 +211,7 @@ class YarnChild {
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
- // Shutting down log4j of the child-vm...
- // This assumes that on return from Task.run()
- // there is no more logging done.
- LogManager.shutdown();
+ TaskLog.syncLogsShutdown(logSyncer);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1560149&r1=1560148&r2=1560149&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Jan 21 19:27:57 2014
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
@@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutp
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -209,6 +211,7 @@ public class MRAppMaster extends Composi
boolean errorHappenedShutDown = false;
private String shutDownMessage = null;
JobStateInternal forcedState = null;
+ private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
@@ -237,6 +240,7 @@ public class MRAppMaster extends Composi
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
this.maxAppAttempts = maxAppAttempts;
+ logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@@ -1064,6 +1068,12 @@ public class MRAppMaster extends Composi
// All components have started, start the job.
startJobs();
}
+
+ @Override
+ public void stop() {
+ super.stop();
+ TaskLog.syncLogsShutdown(logSyncer);
+ }
private void processRecovery() {
if (appAttemptID.getAttemptId() == 1) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1560149&r1=1560148&r2=1560149&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Tue Jan 21 19:27:57 2014
@@ -23,12 +23,17 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtil
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@@ -262,7 +269,86 @@ public class TaskLog {
}
writeToIndexFile(logLocation, isCleanup);
}
-
+
+ public static synchronized void syncLogsShutdown(
+ ScheduledExecutorService scheduler)
+ {
+ // flush standard streams
+ //
+ System.out.flush();
+ System.err.flush();
+
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+
+ // flush & close all appenders
+ LogManager.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static synchronized void syncLogs() {
+ // flush standard streams
+ //
+ System.out.flush();
+ System.err.flush();
+
+ // flush flushable appenders
+ //
+ final Logger rootLogger = Logger.getRootLogger();
+ flushAppenders(rootLogger);
+ final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository().
+ getCurrentLoggers();
+ while (allLoggers.hasMoreElements()) {
+ final Logger l = allLoggers.nextElement();
+ flushAppenders(l);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void flushAppenders(Logger l) {
+ final Enumeration<Appender> allAppenders = l.getAllAppenders();
+ while (allAppenders.hasMoreElements()) {
+ final Appender a = allAppenders.nextElement();
+ if (a instanceof Flushable) {
+ try {
+ ((Flushable) a).flush();
+ } catch (IOException ioe) {
+ System.err.println(a + ": Failed to flush!"
+ + StringUtils.stringifyException(ioe));
+ }
+ }
+ }
+ }
+
+ public static ScheduledExecutorService createLogSyncer() {
+ final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ t.setName("Thread for syncLogs");
+ return t;
+ }
+ });
+ ShutdownHookManager.get().addShutdownHook(new Runnable() {
+ @Override
+ public void run() {
+ TaskLog.syncLogsShutdown(scheduler);
+ }
+ }, 50);
+ scheduler.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ TaskLog.syncLogs();
+ }
+ }, 0L, 5L, TimeUnit.SECONDS);
+ return scheduler;
+ }
+
/**
* The filter for userlogs.
*/
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=1560149&r1=1560148&r2=1560149&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java Tue Jan 21 19:27:57 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent
*
*/
@InterfaceStability.Unstable
-public class TaskLogAppender extends FileAppender {
+public class TaskLogAppender extends FileAppender implements Flushable {
private String taskId; //taskId should be managed as String rather than TaskID object
//so that log4j can configure it from the configuration(log4j.properties).
private Integer maxEvents;
@@ -92,6 +93,7 @@ public class TaskLogAppender extends Fil
}
}
+ @Override
public void flush() {
if (qw != null) {
qw.flush();