You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2017/02/08 07:15:18 UTC

svn commit: r1782110 - in /pig/branches/branch-0.16: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoop/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ sr...

Author: daijy
Date: Wed Feb  8 07:15:18 2017
New Revision: 1782110

URL: http://svn.apache.org/viewvc?rev=1782110&view=rev
Log:
PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
    pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/branch-0.16/src/org/apache/pig/Main.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Feb  8 07:15:18 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5121: Backport PIG-4916, PIG-4921 and PIG-4957 to 0.16 branch (daijy)
+
 PIG-5119: SkewedJoin_15 is unstable (daijy)
 
 PIG-5118: Script fails with Invalid dag containing 0 vertices (rohini)

Modified: pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Feb  8 07:15:18 2017
@@ -204,4 +204,14 @@ public class HadoopShims {
     public static boolean isHadoopYARN() {
         return false;
     }
+
+    /**
+     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+     *
+     * @param hook code to execute during shutdown
+     * @param priority ignored in Hadoop 1
+     */
+    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+        Runtime.getRuntime().addShutdownHook(new Thread(hook));
+    }
 }

Modified: pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java Wed Feb  8 07:15:18 2017
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.tools.pigstats.ScriptState;
@@ -67,14 +68,14 @@ public class PigATSClient {
             timelineClient.init(yarnConf);
             timelineClient.start();
         }
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        HadoopShims.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 timelineClient.stop();
                 executor.shutdownNow();
                 executor = null;
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
         log.info("Created ATS Hook");
     }
 

Modified: pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Feb  8 07:15:18 2017
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -242,4 +243,15 @@ public class HadoopShims {
     public static boolean isHadoopYARN() {
         return true;
     }
+
+    /**
+     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+     *
+     * @param hook code to execute during shutdown
+     * @param priority Priority over the  FileSystem.SHUTDOWN_HOOK_PRIORITY
+     */
+    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+        ShutdownHookManager.get().addShutdownHook(hook,
+                FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+    }
 }

Modified: pig/branches/branch-0.16/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/Main.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/Main.java Wed Feb  8 07:15:18 2017
@@ -59,8 +59,10 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.PigContext;
@@ -100,13 +102,12 @@ import com.google.common.io.Closeables;
 public class Main {
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-
+        HadoopShims.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 FileLocalizer.deleteTempResourceFiles();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
     }
 
     private final static Log log = LogFactory.getLog(Main.class);
@@ -660,6 +661,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } catch (Throwable e) {
             rc = ReturnCode.THROWABLE_EXCEPTION;
             PigStatsUtil.setErrorMessage(e.getMessage());
@@ -668,6 +670,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } finally {
             if (printScriptRunTime) {
                 printScriptRunTime(startTime);
@@ -694,6 +697,22 @@ public class Main {
                 + " (" + duration.getMillis() + " ms)");
     }
 
+    private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) {
+        Throwable cause = e.getCause();
+        // Kill running job when we get InterruptedException
+        // Pig thread is interrupted by mapreduce when Oozie launcher job is killed
+        // Shutdown hook kills running jobs, but sometimes NodeManager can issue
+        // a SIGKILL after AM unregisters and before shutdown hook gets to execute
+        // causing orphaned jobs that continue to run.
+        if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) {
+            try {
+                pigContext.getExecutionEngine().kill();
+            } catch (BackendException be) {
+                log.error("Error while killing running jobs", be);
+            }
+        }
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties properties) {
 
         try {

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Feb  8 07:15:18 2017
@@ -183,6 +183,14 @@ public interface ExecutionEngine {
     public ExecutableManager getExecutableManager();
 
     /**
+     * This method is called when user requests to kill all jobs
+     * associated with the execution engine
+     *
+     * @throws BackendException
+     */
+    public void kill() throws BackendException;
+
+    /**
      * This method is called when a user requests to kill a job associated with
      * the given job id. If it is not possible for a user to kill a job, throw a
      * exception. It is imperative for the job id's being displayed to be unique

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb  8 07:15:18 2017
@@ -378,6 +378,13 @@ public abstract class HExecutionEngine i
     }
 
     @Override
+    public void kill() throws BackendException {
+        if (launcher != null) {
+            launcher.kill();
+        }
+    }
+
+    @Override
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
             launcher.killJob(jobID, getJobConf());

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Feb  8 07:15:18 2017
@@ -76,7 +76,7 @@ public abstract class Launcher {
     protected Map<FileSpec, Exception> failureMap;
     protected JobControl jc = null;
 
-    class HangingJobKiller extends Thread {
+    protected class HangingJobKiller extends Thread {
         public HangingJobKiller() {}
 
         @Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
     }
 
     protected Launcher() {
-        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
         // handle the windows portion of \r
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
             newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
     public void reset() {
         failureMap = Maps.newHashMap();
         totalHadoopTimeSpent = 0;
-        jc = null;
     }
 
     /**

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb  8 07:15:18 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -65,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -86,7 +89,7 @@ import org.apache.pig.tools.pigstats.map
  * Main class that launches pig for Map Reduce
  *
  */
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -94,14 +97,23 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    public MapReduceLauncher() {
+        super();
+        HadoopShims.addShutdownHookWithPriority(new HangingJobKiller(),
+                PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+    }
+
     @Override
     public void kill() {
         try {
-            log.debug("Receive kill signal");
-            if (jc!=null) {
+            if (jc != null && jc.getRunningJobs().size() > 0) {
+                log.info("Received kill signal");
                 for (Job job : jc.getRunningJobs()) {
                     HadoopShims.killJob(job);
                     log.info("Job " + job.getAssignedJobID() + " killed");
+                    String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                            .format(Calendar.getInstance().getTime());
+                    System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
                 }
             }
         } catch (Exception e) {
@@ -301,8 +313,7 @@ public class MapReduceLauncher extends L
                 // Now wait, till we are finished.
                 while(!jc.allFinished()){
 
-                    try { jcThread.join(sleepTime); }
-                    catch (InterruptedException e) {}
+                    jcThread.join(sleepTime);
 
                     List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
 

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Feb  8 07:15:18 2017
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -30,9 +32,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -47,13 +51,13 @@ public class TezSessionManager {
     private static final Log log = LogFactory.getLog(TezSessionManager.class);
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        HadoopShims.addShutdownHookWithPriority(new Runnable() {
 
             @Override
             public void run() {
                 TezSessionManager.shutdown();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
     }
 
     private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -273,6 +277,11 @@ public class TezSessionManager {
                 synchronized (sessionInfo) {
                     if (sessionInfo.session == session) {
                         log.info("Stopping Tez session " + session);
+                        String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                    .format(Calendar.getInstance().getTime());
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
                         session.stop();
                         sessionToRemove = sessionInfo;
                         break;
@@ -299,19 +308,30 @@ public class TezSessionManager {
             shutdown = true;
             for (SessionInfo sessionInfo : sessionPool) {
                 synchronized (sessionInfo) {
+                    TezClient session = sessionInfo.session;
                     try {
-                        if (sessionInfo.session.getAppMasterStatus().equals(
+                        String timeStamp = new SimpleDateFormat(
+                                "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
+                        if (session.getAppMasterStatus().equals(
                                 TezAppMasterStatus.SHUTDOWN)) {
                             log.info("Tez session is already shutdown "
-                                    + sessionInfo.session);
+                                    + session);
+                            System.err.println(timeStamp
+                                    + " Tez session is already shutdown " + session
+                                    + ", sessionName=" + session.getClientName()
+                                    + ", applicationId=" + session.getAppMasterApplicationId());
                             continue;
                         }
-                        log.info("Shutting down Tez session "
-                                + sessionInfo.session);
-                        sessionInfo.session.stop();
+                        log.info("Shutting down Tez session " + session);
+                        // Since hadoop calls org.apache.log4j.LogManager.shutdown();
+                        // the log.info message is not displayed with shutdown hook in Oozie
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
+                        session.stop();
                     } catch (Exception e) {
                         log.error("Error shutting down Tez session "
-                                + sessionInfo.session, e);
+                                + session, e);
                     }
                 }
             }

Modified: pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java?rev=1782110&r1=1782109&r2=1782110&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java Wed Feb  8 07:15:18 2017
@@ -84,4 +84,9 @@ public class PigImplConstants {
      * A unique id for a Pig session used as callerId for underlining component
      */
     public static final String PIG_AUDIT_ID = "pig.script.id";
+
+    // Kill the jobs before cleaning up tmp files
+    public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+    public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+    public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
 }