You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/11/28 13:16:05 UTC

[hadoop] 17/21: MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e10af53b68aad6408dfbf68ec9060b5307b97922
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Fri Nov 20 13:15:47 2020 +0100

    MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann
---
 .../hadoop/mapreduce/v2/app/JobEndNotifier.java    | 51 ++++++++++++++++++-
 .../mapreduce/v2/app/TestJobEndNotifier.java       | 42 ++++++++++++++++
 .../java/org/apache/hadoop/mapred/JobConf.java     | 46 +++++++++++++++++
 .../hadoop/mapreduce/CustomJobEndNotifier.java     | 57 ++++++++++++++++++++++
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  3 ++
 .../src/main/resources/mapred-default.xml          | 17 +++++++
 6 files changed, 215 insertions(+), 1 deletion(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
index 3bf0542..ed49f82 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
@@ -25,9 +25,11 @@ import java.net.MalformedURLException;
 import java.net.Proxy;
 import java.net.URL;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.eclipse.jetty.util.log.Log;
@@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable {
   protected int timeout; // Timeout (ms) on the connection and notification
   protected URL urlToNotify; //URL to notify read from the config
   protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+  // A custom notifier implementation
+  // (see org.apache.hadoop.mapreduce.CustomJobEndNotifier)
+  private String customJobEndNotifierClassName;
 
   /**
    * Parse the URL that needs to be notified of the end of the job, along
@@ -84,6 +89,9 @@ public class JobEndNotifier implements Configurable {
 
     proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
 
+    customJobEndNotifierClassName = StringUtils.stripToNull(
+        conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS));
+
     //Configure the proxy to use if its set. It should be set like
     //proxyType@proxyHostname:port
     if(proxyConf != null && !proxyConf.equals("") &&
@@ -115,11 +123,22 @@ public class JobEndNotifier implements Configurable {
   public Configuration getConf() {
     return conf;
   }
-  
+
   /**
    * Notify the URL just once. Use best effort.
    */
   protected boolean notifyURLOnce() {
+    if (customJobEndNotifierClassName == null) {
+      return notifyViaBuiltInNotifier();
+    } else {
+      return notifyViaCustomNotifier();
+    }
+  }
+
+  /**
+   * Uses a simple HttpURLConnection to do the Job end notification.
+   */
+  private boolean notifyViaBuiltInNotifier() {
     boolean success = false;
     try {
       Log.getLog().info("Job end notification trying " + urlToNotify);
@@ -146,6 +165,36 @@ public class JobEndNotifier implements Configurable {
   }
 
   /**
+   * Uses the custom Job end notifier class to do the Job end notification.
+   */
+  private boolean notifyViaCustomNotifier() {
+    try {
+      Log.getLog().info("Will be using " + customJobEndNotifierClassName
+                        + " for Job end notification");
+
+      final Class<? extends CustomJobEndNotifier> customJobEndNotifierClass =
+              Class.forName(customJobEndNotifierClassName)
+                      .asSubclass(CustomJobEndNotifier.class);
+      final CustomJobEndNotifier customJobEndNotifier =
+              customJobEndNotifierClass.getDeclaredConstructor().newInstance();
+
+      boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf);
+      if (success) {
+        Log.getLog().info("Job end notification to " + urlToNotify
+                          + " succeeded");
+      } else {
+        Log.getLog().warn("Job end notification to " + urlToNotify
+                          + " failed");
+      }
+      return success;
+    } catch (Exception e) {
+      Log.getLog().warn("Job end notification to " + urlToNotify
+                        + " failed", e);
+      return false;
+    }
+  }
+
+  /**
    * Notify a server of the completion of a submitted job. The user must have
    * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
    * @param jobReport JobReport used to read JobId and JobStatus
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
index 5af79d6..1cd6255 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
@@ -31,6 +31,7 @@ import java.io.PrintStream;
 import java.net.Proxy;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.channels.ClosedChannelException;
 
 import javax.servlet.ServletException;
@@ -42,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -299,6 +302,45 @@ public class TestJobEndNotifier extends JobEndNotifier {
     server.stop();
   }
 
+  @Test
+  public void testCustomNotifierClass() throws InterruptedException {
+    JobConf conf = new JobConf();
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL,
+             "http://example.com?jobId=$jobId&jobStatus=$jobStatus");
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+             CustomNotifier.class.getName());
+    this.setConf(conf);
+
+    JobReport jobReport = mock(JobReport.class);
+    JobId jobId = mock(JobId.class);
+    when(jobId.toString()).thenReturn("mock-Id");
+    when(jobReport.getJobId()).thenReturn(jobId);
+    when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED);
+
+    CustomNotifier.urlToNotify = null;
+    this.notify(jobReport);
+    final URL urlToNotify = CustomNotifier.urlToNotify;
+
+    Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED",
+                        urlToNotify.toString());
+  }
+
+  public static final class CustomNotifier implements CustomJobEndNotifier {
+
+    /**
+     * Once notifyOnce was invoked we'll store the URL in this variable
+     * so we can assert on it.
+     */
+    private static URL urlToNotify = null;
+
+    @Override
+    public boolean notifyOnce(final URL url, final Configuration jobConf) {
+      urlToNotify = url;
+      return true;
+    }
+
+  }
+
   private static HttpServer2 startHttpServer() throws Exception {
     new File(System.getProperty(
         "build.webapps", "build/webapps") + "/test").mkdirs();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index c0dd650..2334364 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -1887,6 +1887,52 @@ public class JobConf extends Configuration {
   }
 
   /**
+   * Returns the class to be invoked in order to send a notification
+   * after the job has completed (success/failure).
+   *
+   * @return the fully-qualified name of the class which implements
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the
+   * {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS}
+   * property
+   *
+   * @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String)
+   * @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+   */
+  public String getJobEndNotificationCustomNotifierClass() {
+    return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS);
+  }
+
+  /**
+   * Sets the class to be invoked in order to send a notification after the job
+   * has completed (success/failure).
+   *
+   * A notification url still has to be set which will be passed to
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce(
+   * java.net.URL, org.apache.hadoop.conf.Configuration)}
+   * along with the Job's conf.
+   *
+   * If this is set instead of using a simple HttpURLConnection
+   * we'll create a new instance of this class
+   * which should be an implementation of
+   * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier},
+   * and we'll invoke that.
+   *
+   * @param customNotifierClassName the fully-qualified name of the class
+   *     which implements
+   *     {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}
+   *
+   * @see JobConf#setJobEndNotificationURI(java.lang.String)
+   * @see
+   * org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+   */
+  public void setJobEndNotificationCustomNotifierClass(
+          String customNotifierClassName) {
+
+    set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+            customNotifierClassName);
+  }
+
+  /**
    * Get job-specific shared directory for use as scratch space
    * 
    * <p>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java
new file mode 100644
index 0000000..e5a6b9f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * An interface for implementing a custom Job end notifier. The built-in
+ * Job end notifier uses a simple HTTP connection to notify the Job end status.
+ * By implementing this interface and setting the
+ * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property
+ * in the map-reduce Job configuration you can have your own
+ * notification mechanism. For now this still only works with HTTP/HTTPS URLs,
+ * but by implementing this class you can choose how you want to make the
+ * notification itself. For example you can choose to use a custom
+ * HTTP library, or do a delegation token authentication, maybe set a
+ * custom SSL context on the connection, etc. This means you still have to set
+ * the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property
+ * in the Job's conf.
+ */
+public interface CustomJobEndNotifier {
+
+  /**
+   * The implementation should try to do a Job end notification only once.
+   *
+   * See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS},
+   * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS}
+   * and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly
+   * this method will be invoked.
+   *
+   * @param url the URL which needs to be notified
+   *           (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL})
+   * @param jobConf the map-reduce Job's configuration
+   *
+   * @return true if the notification was successful
+   */
+  boolean notifyOnce(URL url, Configuration jobConf) throws Exception;
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 71b1ef2..4f005df 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1074,6 +1074,9 @@ public interface MRJobConfig {
   public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
     "mapreduce.job.end-notification.max.retry.interval";
 
+  String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS =
+      "mapreduce.job.end-notification.custom-notifier-class";
+
   public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
       5000;
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index e5da41f..bba382c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1373,6 +1373,23 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.job.end-notification.custom-notifier-class</name>
+  <description>A class to be invoked in order to send a notification after the
+               job has completed (success/failure). The class must implement
+               org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification
+               url still has to be set which will be passed to the notifyOnce
+               method of your implementation along with the Job's configuration.
+               If this is set instead of using a simple HttpURLConnection we'll
+               create a new instance of this class. For now this still only works
+               with HTTP/HTTPS URLs, but by implementing this class you can choose
+               how you want to make the notification itself. For example you can
+               choose to use a custom HTTP library, or do a delegation token
+               authentication, maybe set a custom SSL context on the connection, etc.
+               The class needs to have a no-arg constructor.
+  </description>
+</property>
+
   <property>
     <name>mapreduce.job.log4j-properties-file</name>
     <value></value>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org