You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2017/02/08 07:15:18 UTC
svn commit: r1782110 - in /pig/branches/branch-0.16: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ sr...
Author: daijy
Date: Wed Feb 8 07:15:18 2017
New Revision: 1782110
URL: http://svn.apache.org/viewvc?rev=1782110&view=rev
Log:
PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/branch-0.16/src/org/apache/pig/Main.java
pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Feb 8 07:15:18 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch (daijy)
+
PIG-5119: SkewedJoin_15 is unstable (daijy)
PIG-5118: Script fails with Invalid dag containing 0 vertices (rohini)
Modified: pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Feb 8 07:15:18 2017
@@ -204,4 +204,14 @@ public class HadoopShims {
public static boolean isHadoopYARN() {
return false;
}
+
+ /**
+ * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+ *
+ * @param hook code to execute during shutdown
+ * @param priority ignored in Hadoop 1
+ */
+ public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+ Runtime.getRuntime().addShutdownHook(new Thread(hook));
+ }
}
Modified: pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java Wed Feb 8 07:15:18 2017
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -67,14 +68,14 @@ public class PigATSClient {
timelineClient.init(yarnConf);
timelineClient.start();
}
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
timelineClient.stop();
executor.shutdownNow();
executor = null;
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
log.info("Created ATS Hook");
}
Modified: pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Feb 8 07:15:18 2017
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -242,4 +243,15 @@ public class HadoopShims {
public static boolean isHadoopYARN() {
return true;
}
+
+ /**
+ * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+ *
+ * @param hook code to execute during shutdown
+ * @param priority Priority over the FileSystem.SHUTDOWN_HOOK_PRIORITY
+ */
+ public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+ ShutdownHookManager.get().addShutdownHook(hook,
+ FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+ }
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/Main.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/Main.java Wed Feb 8 07:15:18 2017
@@ -59,8 +59,10 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.PigContext;
@@ -100,13 +102,12 @@ import com.google.common.io.Closeables;
public class Main {
static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
-
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
FileLocalizer.deleteTempResourceFiles();
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
}
private final static Log log = LogFactory.getLog(Main.class);
@@ -660,6 +661,7 @@ public class Main {
if(!gruntCalled) {
LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
}
+ killRunningJobsIfInterrupted(e, pigContext);
} catch (Throwable e) {
rc = ReturnCode.THROWABLE_EXCEPTION;
PigStatsUtil.setErrorMessage(e.getMessage());
@@ -668,6 +670,7 @@ public class Main {
if(!gruntCalled) {
LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
}
+ killRunningJobsIfInterrupted(e, pigContext);
} finally {
if (printScriptRunTime) {
printScriptRunTime(startTime);
@@ -694,6 +697,22 @@ public class Main {
+ " (" + duration.getMillis() + " ms)");
}
+ private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) {
+ Throwable cause = e.getCause();
+ // Kill running job when we get InterruptedException
+ // Pig thread is interrupted by mapreduce when Oozie launcher job is killed
+ // Shutdown hook kills running jobs, but sometimes NodeManager can issue
+ // a SIGKILL after AM unregisters and before shutdown hook gets to execute
+ // causing orphaned jobs that continue to run.
+ if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) {
+ try {
+ pigContext.getExecutionEngine().kill();
+ } catch (BackendException be) {
+ log.error("Error while killing running jobs", be);
+ }
+ }
+ }
+
protected static PigProgressNotificationListener makeListener(Properties properties) {
try {
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Feb 8 07:15:18 2017
@@ -183,6 +183,14 @@ public interface ExecutionEngine {
public ExecutableManager getExecutableManager();
/**
+ * This method is called when user requests to kill all jobs
+ * associated with the execution engine
+ *
+ * @throws BackendException
+ */
+ public void kill() throws BackendException;
+
+ /**
* This method is called when a user requests to kill a job associated with
* the given job id. If it is not possible for a user to kill a job, throw a
* exception. It is imperative for the job id's being displayed to be unique
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb 8 07:15:18 2017
@@ -378,6 +378,13 @@ public abstract class HExecutionEngine i
}
@Override
+ public void kill() throws BackendException {
+ if (launcher != null) {
+ launcher.kill();
+ }
+ }
+
+ @Override
public void killJob(String jobID) throws BackendException {
if (launcher != null) {
launcher.killJob(jobID, getJobConf());
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Feb 8 07:15:18 2017
@@ -76,7 +76,7 @@ public abstract class Launcher {
protected Map<FileSpec, Exception> failureMap;
protected JobControl jc = null;
- class HangingJobKiller extends Thread {
+ protected class HangingJobKiller extends Thread {
public HangingJobKiller() {}
@Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
}
protected Launcher() {
- Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
// handle the windows portion of \r
if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
public void reset() {
failureMap = Maps.newHashMap();
totalHadoopTimeSpent = 0;
- jc = null;
}
/**
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 8 07:15:18 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -65,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -86,7 +89,7 @@ import org.apache.pig.tools.pigstats.map
* Main class that launches pig for Map Reduce
*
*/
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -94,14 +97,23 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
+ public MapReduceLauncher() {
+ super();
+ HadoopShims.addShutdownHookWithPriority(new HangingJobKiller(),
+ PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+ }
+
@Override
public void kill() {
try {
- log.debug("Receive kill signal");
- if (jc!=null) {
+ if (jc != null && jc.getRunningJobs().size() > 0) {
+ log.info("Received kill signal");
for (Job job : jc.getRunningJobs()) {
HadoopShims.killJob(job);
log.info("Job " + job.getAssignedJobID() + " killed");
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
@@ -301,8 +313,7 @@ public class MapReduceLauncher extends L
// Now wait, till we are finished.
while(!jc.allFinished()){
- try { jcThread.join(sleepTime); }
- catch (InterruptedException e) {}
+ jcThread.join(sleepTime);
List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Feb 8 07:15:18 2017
@@ -18,7 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,9 +32,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.client.TezAppMasterStatus;
@@ -47,13 +51,13 @@ public class TezSessionManager {
private static final Log log = LogFactory.getLog(TezSessionManager.class);
static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ HadoopShims.addShutdownHookWithPriority(new Runnable() {
@Override
public void run() {
TezSessionManager.shutdown();
}
- });
+ }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
}
private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -273,6 +277,11 @@ public class TezSessionManager {
synchronized (sessionInfo) {
if (sessionInfo.session == session) {
log.info("Stopping Tez session " + session);
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Shutting down Tez session "
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
session.stop();
sessionToRemove = sessionInfo;
break;
@@ -299,19 +308,30 @@ public class TezSessionManager {
shutdown = true;
for (SessionInfo sessionInfo : sessionPool) {
synchronized (sessionInfo) {
+ TezClient session = sessionInfo.session;
try {
- if (sessionInfo.session.getAppMasterStatus().equals(
+ String timeStamp = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
+ if (session.getAppMasterStatus().equals(
TezAppMasterStatus.SHUTDOWN)) {
log.info("Tez session is already shutdown "
- + sessionInfo.session);
+ + session);
+ System.err.println(timeStamp
+ + " Tez session is already shutdown " + session
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
continue;
}
- log.info("Shutting down Tez session "
- + sessionInfo.session);
- sessionInfo.session.stop();
+ log.info("Shutting down Tez session " + session);
+ // Since hadoop calls org.apache.log4j.LogManager.shutdown();
+ // the log.info message is not displayed with shutdown hook in Oozie
+ System.err.println(timeStamp + " Shutting down Tez session "
+ + ", sessionName=" + session.getClientName()
+ + ", applicationId=" + session.getAppMasterApplicationId());
+ session.stop();
} catch (Exception e) {
log.error("Error shutting down Tez session "
- + sessionInfo.session, e);
+ + session, e);
}
}
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java Wed Feb 8 07:15:18 2017
@@ -84,4 +84,9 @@ public class PigImplConstants {
* A unique id for a Pig session used as callerId for underlining component
*/
public static final String PIG_AUDIT_ID = "pig.script.id";
+
+ // Kill the jobs before cleaning up tmp files
+ public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+ public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+ public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
}