You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/11/28 13:16:05 UTC
[hadoop] 17/21: MAPREDUCE-7304. Enhance the map-reduce Job end
notifier to be able to notify the given URL via a custom class. Contributed
by Zoltan Erdmann
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch branch-3.2.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit e10af53b68aad6408dfbf68ec9060b5307b97922
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Fri Nov 20 13:15:47 2020 +0100
MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann
---
.../hadoop/mapreduce/v2/app/JobEndNotifier.java | 51 ++++++++++++++++++-
.../mapreduce/v2/app/TestJobEndNotifier.java | 42 ++++++++++++++++
.../java/org/apache/hadoop/mapred/JobConf.java | 46 +++++++++++++++++
.../hadoop/mapreduce/CustomJobEndNotifier.java | 57 ++++++++++++++++++++++
.../org/apache/hadoop/mapreduce/MRJobConfig.java | 3 ++
.../src/main/resources/mapred-default.xml | 17 +++++++
6 files changed, 215 insertions(+), 1 deletion(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
index 3bf0542..ed49f82 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
@@ -25,9 +25,11 @@ import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.eclipse.jetty.util.log.Log;
@@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable {
protected int timeout; // Timeout (ms) on the connection and notification
protected URL urlToNotify; //URL to notify read from the config
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+ // A custom notifier implementation
+ // (see org.apache.hadoop.mapreduce.CustomJobEndNotifier)
+ private String customJobEndNotifierClassName;
/**
* Parse the URL that needs to be notified of the end of the job, along
@@ -84,6 +89,9 @@ public class JobEndNotifier implements Configurable {
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
+ customJobEndNotifierClassName = StringUtils.stripToNull(
+ conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS));
+
//Configure the proxy to use if its set. It should be set like
//proxyType@proxyHostname:port
if(proxyConf != null && !proxyConf.equals("") &&
@@ -115,11 +123,22 @@ public class JobEndNotifier implements Configurable {
public Configuration getConf() {
return conf;
}
-
+
/**
* Notify the URL just once. Use best effort.
*/
protected boolean notifyURLOnce() {
+ if (customJobEndNotifierClassName == null) {
+ return notifyViaBuiltInNotifier();
+ } else {
+ return notifyViaCustomNotifier();
+ }
+ }
+
+ /**
+ * Uses a simple HttpURLConnection to do the Job end notification.
+ */
+ private boolean notifyViaBuiltInNotifier() {
boolean success = false;
try {
Log.getLog().info("Job end notification trying " + urlToNotify);
@@ -146,6 +165,36 @@ public class JobEndNotifier implements Configurable {
}
/**
+ * Uses the custom Job end notifier class to do the Job end notification.
+ */
+ private boolean notifyViaCustomNotifier() {
+ try {
+ Log.getLog().info("Will be using " + customJobEndNotifierClassName
+ + " for Job end notification");
+
+ final Class<? extends CustomJobEndNotifier> customJobEndNotifierClass =
+ Class.forName(customJobEndNotifierClassName)
+ .asSubclass(CustomJobEndNotifier.class);
+ final CustomJobEndNotifier customJobEndNotifier =
+ customJobEndNotifierClass.getDeclaredConstructor().newInstance();
+
+ boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf);
+ if (success) {
+ Log.getLog().info("Job end notification to " + urlToNotify
+ + " succeeded");
+ } else {
+ Log.getLog().warn("Job end notification to " + urlToNotify
+ + " failed");
+ }
+ return success;
+ } catch (Exception e) {
+ Log.getLog().warn("Job end notification to " + urlToNotify
+ + " failed", e);
+ return false;
+ }
+ }
+
+ /**
* Notify a server of the completion of a submitted job. The user must have
* configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
* @param jobReport JobReport used to read JobId and JobStatus
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
index 5af79d6..1cd6255 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
@@ -31,6 +31,7 @@ import java.io.PrintStream;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.channels.ClosedChannelException;
import javax.servlet.ServletException;
@@ -42,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -299,6 +302,45 @@ public class TestJobEndNotifier extends JobEndNotifier {
server.stop();
}
+ @Test
+ public void testCustomNotifierClass() throws InterruptedException {
+ JobConf conf = new JobConf();
+ conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL,
+ "http://example.com?jobId=$jobId&jobStatus=$jobStatus");
+ conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+ CustomNotifier.class.getName());
+ this.setConf(conf);
+
+ JobReport jobReport = mock(JobReport.class);
+ JobId jobId = mock(JobId.class);
+ when(jobId.toString()).thenReturn("mock-Id");
+ when(jobReport.getJobId()).thenReturn(jobId);
+ when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED);
+
+ CustomNotifier.urlToNotify = null;
+ this.notify(jobReport);
+ final URL urlToNotify = CustomNotifier.urlToNotify;
+
+ Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED",
+ urlToNotify.toString());
+ }
+
+ public static final class CustomNotifier implements CustomJobEndNotifier {
+
+ /**
+ * Once notifyOnce was invoked we'll store the URL in this variable
+ * so we can assert on it.
+ */
+ private static URL urlToNotify = null;
+
+ @Override
+ public boolean notifyOnce(final URL url, final Configuration jobConf) {
+ urlToNotify = url;
+ return true;
+ }
+
+ }
+
private static HttpServer2 startHttpServer() throws Exception {
new File(System.getProperty(
"build.webapps", "build/webapps") + "/test").mkdirs();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index c0dd650..2334364 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -1887,6 +1887,52 @@ public class JobConf extends Configuration {
}
/**
+ * Returns the class to be invoked in order to send a notification
+ * after the job has completed (success/failure).
+ *
+ * @return the fully-qualified name of the class which implements
+ * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the
+ * {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS}
+ * property
+ *
+ * @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String)
+ * @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+ */
+ public String getJobEndNotificationCustomNotifierClass() {
+ return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS);
+ }
+
+ /**
+ * Sets the class to be invoked in order to send a notification after the job
+ * has completed (success/failure).
+ *
+ * A notification url still has to be set which will be passed to
+ * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce(
+ * java.net.URL, org.apache.hadoop.conf.Configuration)}
+ * along with the Job's conf.
+ *
+ * If this is set instead of using a simple HttpURLConnection
+ * we'll create a new instance of this class
+ * which should be an implementation of
+ * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier},
+ * and we'll invoke that.
+ *
+ * @param customNotifierClassName the fully-qualified name of the class
+ * which implements
+ * {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}
+ *
+ * @see JobConf#setJobEndNotificationURI(java.lang.String)
+ * @see
+ * org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
+ */
+ public void setJobEndNotificationCustomNotifierClass(
+ String customNotifierClassName) {
+
+ set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
+ customNotifierClassName);
+ }
+
+ /**
* Get job-specific shared directory for use as scratch space
*
* <p>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java
new file mode 100644
index 0000000..e5a6b9f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CustomJobEndNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * An interface for implementing a custom Job end notifier. The built-in
+ * Job end notifier uses a simple HTTP connection to notify the Job end status.
+ * By implementing this interface and setting the
+ * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property
+ * in the map-reduce Job configuration you can have your own
+ * notification mechanism. For now this still only works with HTTP/HTTPS URLs,
+ * but by implementing this class you can choose how you want to make the
+ * notification itself. For example you can choose to use a custom
+ * HTTP library, or do a delegation token authentication, maybe set a
+ * custom SSL context on the connection, etc. This means you still have to set
+ * the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property
+ * in the Job's conf.
+ */
+public interface CustomJobEndNotifier {
+
+ /**
+ * The implementation should try to do a Job end notification only once.
+ *
+ * See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS},
+ * {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS}
+ * and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly
+ * this method will be invoked.
+ *
+ * @param url the URL which needs to be notified
+ * (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL})
+ * @param jobConf the map-reduce Job's configuration
+ *
+ * @return true if the notification was successful
+ */
+ boolean notifyOnce(URL url, Configuration jobConf) throws Exception;
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 71b1ef2..4f005df 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1074,6 +1074,9 @@ public interface MRJobConfig {
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
"mapreduce.job.end-notification.max.retry.interval";
+ String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS =
+ "mapreduce.job.end-notification.custom-notifier-class";
+
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
5000;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index e5da41f..bba382c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1373,6 +1373,23 @@
</description>
</property>
+<property>
+ <name>mapreduce.job.end-notification.custom-notifier-class</name>
+ <description>A class to be invoked in order to send a notification after the
+ job has completed (success/failure). The class must implement
+ org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification
+ url still has to be set which will be passed to the notifyOnce
+ method of your implementation along with the Job's configuration.
+ If this is set instead of using a simple HttpURLConnection we'll
+ create a new instance of this class. For now this still only works
+ with HTTP/HTTPS URLs, but by implementing this class you can choose
+ how you want to make the notification itself. For example you can
+ choose to use a custom HTTP library, or do a delegation token
+ authentication, maybe set a custom SSL context on the connection, etc.
+ The class needs to have a no-arg constructor.
+ </description>
+</property>
+
<property>
<name>mapreduce.job.log4j-properties-file</name>
<value></value>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org