You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/01/25 23:43:11 UTC

[incubator-druid] branch master updated: Kill Hadoop MR task on kill of Hadoop ingestion task (#6828)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8492d94  Kill Hadoop MR task on kill of Hadoop ingestion task  (#6828)
8492d94 is described below

commit 8492d94f599da1f7851add2a0e7500515abd881d
Author: Ankit Kothari <an...@gmail.com>
AuthorDate: Fri Jan 25 15:43:06 2019 -0800

    Kill Hadoop MR task on kill of Hadoop ingestion task  (#6828)
    
    * KillTask from overlord UI now makes sure that it terminates the underlying MR job, thus saving unnecessary compute
    
    Run in jobby is now split into 2
     1. submitAndGetHadoopJobId followed by 2. run
      submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion
    
    JobHelper writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunner
    
    HadoopIndexTask reads this path when kill task is clicked to get hte jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabled `canRestore` method to return `true` for HadoopIndexTask in order for the stopGracefully method to be called
    
    Hadoop*Job files have been changed to incorporate the changes to jobby
    
    * Addressing PR comments
    
    * Addressing PR comments - Fix taskDir
    
    * Addressing PR comments - For changing the contract of Task.stopGracefully()
    `SingleTaskBackgroundRunner` calls stopGracefully in stop() and then checks for canRestore condition to return the status of the task
    
    * Addressing PR comments
     1. Formatting
     2. Removing `submitAndGetHadoopJobId` from `Jobby` and calling writeJobIdToFile in the job itself
    
    * Addressing PR comments
     1. POM change. Moving hadoop dependency to indexing-hadoop
    
    * Addressing PR comments
     1. stopGracefully now accepts TaskConfig as a param
         Handling isRestoreOnRestart in stopGracefully for `AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask`
         Changing tests to make TaskConfig param isRestoreOnRestart to true
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  6 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  6 +-
 indexing-hadoop/pom.xml                            | 16 +++++
 .../indexer/DetermineHashedPartitionsJob.java      |  8 ++-
 .../druid/indexer/DeterminePartitionsJob.java      | 12 ++++
 .../HadoopDruidDetermineConfigurationJob.java      |  7 ++
 .../druid/indexer/HadoopDruidIndexerConfig.java    | 11 +++
 .../druid/indexer/HadoopDruidIndexerJob.java       |  8 ++-
 .../apache/druid/indexer/IndexGeneratorJob.java    |  5 ++
 .../java/org/apache/druid/indexer/JobHelper.java   | 20 ++++++
 .../druid/indexing/common/task/AbstractTask.java   | 10 ++-
 .../task/AppenderatorDriverRealtimeIndexTask.java  | 41 ++++++-----
 .../indexing/common/task/HadoopIndexTask.java      | 84 +++++++++++++++++++++-
 .../indexing/common/task/RealtimeIndexTask.java    | 41 ++++++-----
 .../apache/druid/indexing/common/task/Task.java    |  8 ++-
 .../overlord/SingleTaskBackgroundRunner.java       | 13 ++--
 .../seekablestream/SeekableStreamIndexTask.java    |  7 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   | 10 +--
 .../common/task/RealtimeIndexTaskTest.java         | 10 +--
 .../overlord/SingleTaskBackgroundRunnerTest.java   |  2 +-
 .../druid/indexing/overlord/TestTaskRunner.java    |  2 +-
 pom.xml                                            | 18 +++++
 22 files changed, 269 insertions(+), 76 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 0b6e7c9..7bf83dd 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -345,7 +345,7 @@ public class KafkaIndexTaskTest
   {
     synchronized (runningTasks) {
       for (Task task : runningTasks) {
-        task.stopGracefully();
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
       }
 
       runningTasks.clear();
@@ -1848,7 +1848,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(2, countEvents(task1));
 
     // Stop without publishing segment
-    task1.stopGracefully();
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
     unlockAppenderatorBasePersistDirForTask(task1);
 
     Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -2339,7 +2339,7 @@ public class KafkaIndexTaskTest
         null,
         50000,
         null,
-        false,
+        true,
         null,
         null
     );
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 31bedd9..a62d45a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -331,7 +331,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   {
     synchronized (runningTasks) {
       for (Task task : runningTasks) {
-        task.stopGracefully();
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
       }
 
       runningTasks.clear();
@@ -2126,7 +2126,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(2, countEvents(task1));
 
     // Stop without publishing segment
-    task1.stopGracefully();
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
     unlockAppenderatorBasePersistDirForTask(task1);
 
     Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -2651,7 +2651,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         null,
         50000,
         null,
-        false,
+        true,
         null,
         null
     );
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index bdf6c87..92cd1e7 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -131,6 +131,22 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-server</artifactId>
             <version>${project.parent.version}</version>
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index c8696b5..976b78d 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -75,6 +75,7 @@ public class DetermineHashedPartitionsJob implements Jobby
   private final HadoopDruidIndexerConfig config;
   private String failureCause;
   private Job groupByJob;
+  private long startTime;
 
   public DetermineHashedPartitionsJob(
       HadoopDruidIndexerConfig config
@@ -91,7 +92,7 @@ public class DetermineHashedPartitionsJob implements Jobby
        * Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
        * in the final segment.
        */
-      final long startTime = System.currentTimeMillis();
+      startTime = System.currentTimeMillis();
       groupByJob = Job.getInstance(
           new Configuration(),
           StringUtils.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
@@ -125,6 +126,11 @@ public class DetermineHashedPartitionsJob implements Jobby
       groupByJob.submit();
       log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
 
+      // Store the jobId in the file
+      if (groupByJob.getJobID() != null) {
+        JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
+      }
+
       if (!groupByJob.waitForCompletion(true)) {
         log.error("Job failed: %s", groupByJob.getJobID());
         failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 27cc9c3..d8c8ae2 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -161,6 +161,12 @@ public class DeterminePartitionsJob implements Jobby
         groupByJob.submit();
         log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
 
+        // Store the jobId in the file
+        if (groupByJob.getJobID() != null) {
+          JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
+        }
+
+
         if (!groupByJob.waitForCompletion(true)) {
           log.error("Job failed: %s", groupByJob.getJobID());
           failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
@@ -218,6 +224,12 @@ public class DeterminePartitionsJob implements Jobby
           dimSelectionJob.getTrackingURL()
       );
 
+      // Store the jobId in the file
+      if (dimSelectionJob.getJobID() != null) {
+        JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), dimSelectionJob.getJobID().toString());
+      }
+
+
       if (!dimSelectionJob.waitForCompletion(true)) {
         log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
         failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 294c676..10551ba 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -39,6 +39,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
   private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
   private final HadoopDruidIndexerConfig config;
   private Jobby job;
+  private String hadoopJobIdFile;
 
   @Inject
   public HadoopDruidDetermineConfigurationJob(
@@ -55,6 +56,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
 
     if (config.isDeterminingPartitions()) {
       job = config.getPartitionsSpec().getPartitionJob(config);
+      config.setHadoopJobIdFileName(hadoopJobIdFile);
       return JobHelper.runSingleJob(job, config);
     } else {
       int shardsPerInterval = config.getPartitionsSpec().getNumShards();
@@ -109,4 +111,9 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
 
     return job.getErrorMessage();
   }
+
+  public void setHadoopJobIdFile(String hadoopJobIdFile)
+  {
+    this.hadoopJobIdFile = hadoopJobIdFile;
+  }
 }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 12228a4..22266ac 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -212,6 +212,7 @@ public class HadoopDruidIndexerConfig
 
   private HadoopIngestionSpec schema;
   private PathSpec pathSpec;
+  private String hadoopJobIdFileName;
   private final Map<Long, ShardSpecLookup> shardSpecLookups = new HashMap<>();
   private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = new HashMap<>();
   private final Granularity rollupGran;
@@ -375,6 +376,16 @@ public class HadoopDruidIndexerConfig
     return schema.getTuningConfig().getMaxParseExceptions();
   }
 
+  public void setHadoopJobIdFileName(String hadoopJobIdFileName)
+  {
+    this.hadoopJobIdFileName = hadoopJobIdFileName;
+  }
+
+  public String getHadoopJobIdFileName()
+  {
+    return hadoopJobIdFileName;
+  }
+
   /**
    * Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
    * or via injected system properties) before this method is called.  The {@link PathSpec} may
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
index 331863a..6d20dbe 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
@@ -38,6 +38,7 @@ public class HadoopDruidIndexerJob implements Jobby
   private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
   private IndexGeneratorJob indexJob;
   private volatile List<DataSegment> publishedSegments = null;
+  private String hadoopJobIdFile;
 
   @Inject
   public HadoopDruidIndexerJob(
@@ -92,7 +93,7 @@ public class HadoopDruidIndexerJob implements Jobby
         }
     );
 
-
+    config.setHadoopJobIdFileName(hadoopJobIdFile);
     return JobHelper.runJobs(jobs, config);
   }
 
@@ -124,4 +125,9 @@ public class HadoopDruidIndexerJob implements Jobby
     }
     return publishedSegments;
   }
+
+  public void setHadoopJobIdFile(String hadoopJobIdFile)
+  {
+    this.hadoopJobIdFile = hadoopJobIdFile;
+  }
 }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index f2ae2b9..eed1906 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -208,6 +208,11 @@ public class IndexGeneratorJob implements Jobby
       job.submit();
       log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
 
+      // Store the jobId in the file
+      if (job.getJobID() != null) {
+        JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), job.getJobID().toString());
+      }
+
       boolean success = job.waitForCompletion(true);
 
       Counters counters = job.getCounters();
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index da5b9e1..f5fb82a 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -59,8 +59,10 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -347,6 +349,24 @@ public class JobHelper
     }
   }
 
+  public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJobId)
+  {
+    if (hadoopJobId != null && hadoopJobIdFileName != null) {
+      try {
+        HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(
+            new OutputStreamWriter(new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8),
+            hadoopJobId
+        );
+        log.info("MR job id [%s] is written to the file [%s]", hadoopJobId, hadoopJobIdFileName);
+      }
+      catch (IOException e) {
+        log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName);
+      }
+    } else {
+      log.info("Either job id or file name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName);
+    }
+  }
+
   public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
   {
     boolean succeeded = job.run();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index cac4e5a..27e585e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -153,11 +154,14 @@ public abstract class AbstractTask implements Task
     return false;
   }
 
+  /**
+   * Should be called independent of canRestore so that resource cleaning can be achieved.
+   * If resource cleaning is required, concrete class should override this method
+   */
   @Override
-  public void stopGracefully()
+  public void stopGracefully(TaskConfig taskConfig)
   {
-    // Should not be called when canRestore = false.
-    throw new UnsupportedOperationException("Cannot stop gracefully");
+    // Do nothing and let the concrete class handle it
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1dcf92b..1cb83cb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
 import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
@@ -401,29 +402,31 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
   }
 
   @Override
-  public void stopGracefully()
+  public void stopGracefully(TaskConfig taskConfig)
   {
-    try {
-      synchronized (this) {
-        if (!gracefullyStopped) {
-          gracefullyStopped = true;
-          if (firehose == null) {
-            log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
-          } else if (finishingJob) {
-            log.info("stopGracefully: Interrupting finishJob.");
-            runThread.interrupt();
-          } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
-            log.info("stopGracefully: Draining firehose.");
-            firehose.close();
-          } else {
-            log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
-            runThread.interrupt();
+    if (taskConfig.isRestoreTasksOnRestart()) {
+      try {
+        synchronized (this) {
+          if (!gracefullyStopped) {
+            gracefullyStopped = true;
+            if (firehose == null) {
+              log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
+            } else if (finishingJob) {
+              log.info("stopGracefully: Interrupting finishJob.");
+              runThread.interrupt();
+            } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
+              log.info("stopGracefully: Draining firehose.");
+              firehose.close();
+            } else {
+              log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
+              runThread.interrupt();
+            }
           }
         }
       }
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
+      catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
     }
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 104123f..398ed96 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -48,6 +48,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockAcquireAction;
 import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
 import org.apache.druid.java.util.common.DateTimes;
@@ -59,6 +60,8 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.util.ToolRunner;
 import org.joda.time.Interval;
 
 import javax.servlet.http.HttpServletRequest;
@@ -69,6 +72,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
@@ -80,6 +84,8 @@ import java.util.SortedSet;
 public class HadoopIndexTask extends HadoopTask implements ChatHandler
 {
   private static final Logger log = new Logger(HadoopIndexTask.class);
+  private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
+  private TaskConfig taskConfig = null;
 
   private static String getTheDataSource(HadoopIngestionSpec spec)
   {
@@ -218,10 +224,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     return classpathPrefix;
   }
 
+  public String getHadoopJobIdFileName()
+  {
+    return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
+  }
+
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     try {
+      taskConfig = toolbox.getConfig();
       if (chatHandlerProvider.isPresent()) {
         log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
         chatHandlerProvider.get().register(getId(), this, false);
@@ -259,6 +271,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @SuppressWarnings("unchecked")
   private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
   {
+    String hadoopJobIdFile = getHadoopJobIdFileName();
     final ClassLoader loader = buildClassLoader(toolbox);
     boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
 
@@ -277,7 +290,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     String[] determinePartitionsInput = new String[]{
         toolbox.getObjectMapper().writeValueAsString(spec),
         toolbox.getConfig().getHadoopWorkingPath(),
-        toolbox.getSegmentPusher().getPathForHadoop()
+        toolbox.getSegmentPusher().getPathForHadoop(),
+        hadoopJobIdFile
     };
 
     HadoopIngestionSpec indexerSchema;
@@ -367,7 +381,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
 
     String[] buildSegmentsInput = new String[]{
         toolbox.getObjectMapper().writeValueAsString(indexerSchema),
-        version
+        version,
+        hadoopJobIdFile
     };
 
     Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
@@ -412,6 +427,57 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+    final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+    File hadoopJobIdFile = new File(getHadoopJobIdFileName());
+    String jobId = null;
+
+    try {
+      if (hadoopJobIdFile.exists()) {
+        jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
+      }
+    }
+    catch (Exception e) {
+      log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
+    }
+
+    try {
+      if (jobId != null) {
+        ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
+            taskConfig.getDefaultHadoopCoordinates());
+
+        Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
+            "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
+            loader
+        );
+        String[] buildKillJobInput = new String[]{
+            "-kill",
+            jobId
+        };
+
+        Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
+        Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
+
+        Thread.currentThread().setContextClassLoader(loader);
+        final String killStatusString = (String) innerProcessingRunTask.invoke(
+            killMRJobInnerProcessingRunner,
+            new Object[]{buildKillJobInput}
+        );
+
+        log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString));
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(oldLoader);
+    }
+
+  }
+
   @GET
   @Path("/rowStats")
   @Produces(MediaType.APPLICATION_JSON)
@@ -540,6 +606,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       final String schema = args[0];
       final String workingPath = args[1];
       final String segmentOutputPath = args[2];
+      final String hadoopJobIdFile = args[3];
 
       final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
           .readValue(
@@ -553,6 +620,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       );
 
       job = new HadoopDruidDetermineConfigurationJob(config);
+      job.setHadoopJobIdFile(hadoopJobIdFile);
 
       log.info("Starting a hadoop determine configuration job...");
       if (job.run()) {
@@ -585,6 +653,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     {
       final String schema = args[0];
       String version = args[1];
+      final String hadoopJobIdFile = args[2];
 
       final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
           .readValue(
@@ -606,6 +675,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
         maybeHandler = null;
       }
       job = new HadoopDruidIndexerJob(config, maybeHandler);
+      job.setHadoopJobIdFile(hadoopJobIdFile);
 
       log.info("Starting a hadoop index generator job...");
       try {
@@ -649,6 +719,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
+  @SuppressWarnings("unused")
+  public static class HadoopKillMRJobIdProcessingRunner
+  {
+    public String runTask(String[] args) throws Exception
+    {
+      int res = ToolRunner.run(new JobClient(), args);
+      return res == 0 ? "Success" : "Fail";
+    }
+  }
+
   public static class HadoopIndexGeneratorInnerProcessingStatus
   {
     private final List<DataSegment> dataSegments;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index 83130e6..8a7fe6a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockAcquireAction;
 import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
@@ -494,29 +495,31 @@ public class RealtimeIndexTask extends AbstractTask
   }
 
   @Override
-  public void stopGracefully()
+  public void stopGracefully(TaskConfig taskConfig)
   {
-    try {
-      synchronized (this) {
-        if (!gracefullyStopped) {
-          gracefullyStopped = true;
-          if (firehose == null) {
-            log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
-          } else if (finishingJob) {
-            log.info("stopGracefully: Interrupting finishJob.");
-            runThread.interrupt();
-          } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
-            log.info("stopGracefully: Draining firehose.");
-            firehose.close();
-          } else {
-            log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
-            runThread.interrupt();
+    if (taskConfig.isRestoreTasksOnRestart()) {
+      try {
+        synchronized (this) {
+          if (!gracefullyStopped) {
+            gracefullyStopped = true;
+            if (firehose == null) {
+              log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
+            } else if (finishingJob) {
+              log.info("stopGracefully: Interrupting finishJob.");
+              runThread.interrupt();
+            } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
+              log.info("stopGracefully: Draining firehose.");
+              firehose.close();
+            } else {
+              log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
+              runThread.interrupt();
+            }
           }
         }
       }
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
+      catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
     }
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 3ed4548..4523fc2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTask;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.query.Query;
@@ -163,11 +164,12 @@ public interface Task
   boolean canRestore();
 
   /**
-   * Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
-   * {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
+   * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with
    * extreme prejudice.
+   *
+   * @param taskConfig TaskConfig for this task
    */
-  void stopGracefully();
+  void stopGracefully(TaskConfig taskConfig);
 
   /**
    * Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index a9b6317..cb222ea 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -178,17 +178,15 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
     if (runningItem != null) {
       final Task task = runningItem.getTask();
       final long start = System.currentTimeMillis();
-      final boolean graceful;
       final long elapsed;
       boolean error = false;
 
-      if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
-        // Attempt graceful shutdown.
-        graceful = true;
-        log.info("Starting graceful shutdown of task[%s].", task.getId());
+      // stopGracefully for resource cleaning
+      log.info("Starting graceful shutdown of task[%s].", task.getId());
+      task.stopGracefully(taskConfig);
 
+      if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
         try {
-          task.stopGracefully();
           final TaskStatus taskStatus = runningItem.getResult().get(
               new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
               TimeUnit.MILLISECONDS
@@ -213,7 +211,6 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
           TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
         }
       } else {
-        graceful = false;
         TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
       }
 
@@ -223,7 +220,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
           .builder()
           .setDimension("task", task.getId())
           .setDimension("dataSource", task.getDataSource())
-          .setDimension("graceful", String.valueOf(graceful))
+          .setDimension("graceful", "true") // for backward compatibility
           .setDimension("error", String.valueOf(error));
 
       emitter.emit(metricBuilder.build("task/interrupt/count", 1L));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 7f27991..2918696 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -179,9 +180,11 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   }
 
   @Override
-  public void stopGracefully()
+  public void stopGracefully(TaskConfig taskConfig)
   {
-    runner.stopGracefully();
+    if (taskConfig.isRestoreTasksOnRestart()) {
+      runner.stopGracefully();
+    }
   }
 
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 59899fe..0add56b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1029,7 +1029,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
       );
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
 
       // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
       final TaskStatus taskStatus = statusFuture.get();
@@ -1129,7 +1129,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
       Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue());
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
 
       // Wait for the task to finish. The status doesn't really matter.
       while (!statusFuture.isDone()) {
@@ -1202,7 +1202,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
       );
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
 
       // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
       final TaskStatus taskStatus = statusFuture.get();
@@ -1257,7 +1257,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
 
     final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null);
 
-    task1.stopGracefully();
+    task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
     final ListenableFuture<TaskStatus> statusFuture = runTask(task1);
 
     // Wait for the task to finish.
@@ -1517,7 +1517,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         return result;
       }
     };
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
+    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
 
     final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
         taskLockbox,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 5317b66..9a6a8e9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -600,7 +600,7 @@ public class RealtimeIndexTaskTest
       );
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolbox.getConfig());
 
       // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
       final TaskStatus taskStatus = statusFuture.get();
@@ -708,7 +708,7 @@ public class RealtimeIndexTaskTest
       Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue());
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolbox.getConfig());
 
       // Wait for the task to finish. The status doesn't really matter.
       while (!statusFuture.isDone()) {
@@ -788,7 +788,7 @@ public class RealtimeIndexTaskTest
       );
 
       // Trigger graceful shutdown.
-      task1.stopGracefully();
+      task1.stopGracefully(taskToolbox.getConfig());
 
       // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
       final TaskStatus taskStatus = statusFuture.get();
@@ -837,9 +837,9 @@ public class RealtimeIndexTaskTest
     final File directory = tempFolder.newFolder();
     final RealtimeIndexTask task1 = makeRealtimeTask(null);
 
-    task1.stopGracefully();
     final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
     final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
+    task1.stopGracefully(taskToolbox.getConfig());
     final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
 
     // Wait for the task to finish.
@@ -970,7 +970,7 @@ public class RealtimeIndexTaskTest
       final File directory
   )
   {
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
+    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
     try {
       taskStorage.insert(task, TaskStatus.running(task.getId()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 9f30ff9..c8f9380 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -195,7 +195,7 @@ public class SingleTaskBackgroundRunnerTest
     }
 
     @Override
-    public void stopGracefully()
+    public void stopGracefully(TaskConfig taskConfig)
     {
       gracefullyStopped.set();
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index cb6cffe..c916361 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -161,7 +161,7 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
         log.info("Starting graceful shutdown of task[%s].", task.getId());
 
         try {
-          task.stopGracefully();
+          task.stopGracefully(taskConfig);
           final TaskStatus taskStatus = item.getResult().get(
               new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
               TimeUnit.MILLISECONDS
diff --git a/pom.xml b/pom.xml
index 58642a7..668ba3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -634,6 +634,24 @@
                 </exclusions>
             </dependency>
             <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-common</artifactId>
+                <version>${hadoop.compile.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-mapreduce-client-core</artifactId>
+                <version>${hadoop.compile.version}</version>
+                <scope>provided</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>javax.servlet</groupId>
+                        <artifactId>servlet-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
                 <groupId>org.mapdb</groupId>
                 <artifactId>mapdb</artifactId>
                 <version>1.0.8</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org