You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/26 15:23:07 UTC
svn commit: r1365979 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-core...
Author: tucu
Date: Thu Jul 26 13:23:05 2012
New Revision: 1365979
URL: http://svn.apache.org/viewvc?rev=1365979&view=rev
Log:
MAPREDUCE-4417. add support for encrypted shuffle (tucu)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jul 26 13:23:05 2012
@@ -135,6 +135,8 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process
command-line. (ahmed via tucu)
+ MAPREDUCE-4417. add support for encrypted shuffle (tucu)
+
IMPROVEMENTS
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jul 26 13:23:05 2012
@@ -473,5 +473,10 @@
<!--
The above 2 fields are accessed locally and only via methods that are synchronized.
-->
-
+
+ <Match>
+ <Class name="org.apache.hadoop.mapred.ShuffleHandler" />
+ <Field name="sslFileBufferSize" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
</FindBugsFilter>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Jul 26 13:23:05 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -108,7 +110,8 @@ public abstract class TaskImpl implement
private long scheduledTime;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
+
+ protected boolean encryptedShuffle;
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
@@ -274,6 +277,8 @@ public abstract class TaskImpl implement
this.jobToken = jobToken;
this.metrics = metrics;
this.appContext = appContext;
+ this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null
@@ -637,9 +642,10 @@ public abstract class TaskImpl implement
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
- tce.setMapOutputServerAddress("http://"
- + attempt.getNodeHttpAddress().split(":")[0] + ":"
- + attempt.getShufflePort());
+ String scheme = (encryptedShuffle) ? "https://" : "http://";
+ tce.setMapOutputServerAddress(scheme
+ + attempt.getNodeHttpAddress().split(":")[0] + ":"
+ + attempt.getShufflePort());
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Jul 26 13:23:05 2012
@@ -79,4 +79,9 @@ public interface MRConfig {
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
public static final String MAX_BLOCK_LOCATIONS_KEY =
"mapreduce.job.max.split.locations";
+
+ public static final String SHUFFLE_SSL_ENABLED_KEY =
+ "mapreduce.shuffle.ssl.enabled";
+
+ public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 26 13:23:05 2012
@@ -25,11 +25,13 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.HttpURLConnection;
import java.net.URLConnection;
+import java.security.GeneralSecurityException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,9 +44,11 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -92,6 +96,9 @@ class Fetcher<K,V> extends Thread {
private volatile boolean stopped = false;
+ private static boolean sslShuffle;
+ private static SSLFactory sslFactory;
+
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
@@ -135,6 +142,20 @@ class Fetcher<K,V> extends Thread {
setName("fetcher#" + id);
setDaemon(true);
+
+ synchronized (Fetcher.class) {
+ sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+ if (sslShuffle && sslFactory == null) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ throw new RuntimeException(ex);
+ }
+ }
+ }
}
public void run() {
@@ -173,8 +194,25 @@ class Fetcher<K,V> extends Thread {
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
}
+ protected HttpURLConnection openConnection(URL url) throws IOException {
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ if (sslShuffle) {
+ HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+ try {
+ httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+ }
+ return conn;
+ }
+
/**
* The crux of the matter...
*
@@ -205,7 +243,7 @@ class Fetcher<K,V> extends Thread {
try {
URL url = getMapOutputURL(host, maps);
- HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ HttpURLConnection connection = openConnection(url);
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jul 26 13:23:05 2012
@@ -513,6 +513,21 @@
</property>
<property>
+ <name>mapreduce.shuffle.ssl.enabled</name>
+ <value>false</value>
+ <description>
+ Whether to use SSL for for the Shuffle HTTP endpoints.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.shuffle.ssl.file.buffer.size</name>
+ <value>65536</value>
+ <description>Buffer size for reading spills from file when using SSL.
+ </description>
+</property>
+
+<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>
<description>The percentage of memory -relative to the maximum heap size- to
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java?rev=1365979&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java Thu Jul 26 13:23:05 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security.ssl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapred.RunningJob;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URL;
+
+public class TestEncryptedShuffle {
+
+ private static final String BASEDIR =
+ System.getProperty("test.build.dir", "target/test-dir") + "/" +
+ TestEncryptedShuffle.class.getSimpleName();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File base = new File(BASEDIR);
+ FileUtil.fullyDelete(base);
+ base.mkdirs();
+ }
+
+ @Before
+ public void createCustomYarnClasspath() throws Exception {
+ String classpathDir =
+ KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+
+ URL url = Thread.currentThread().getContextClassLoader().
+ getResource("mrapp-generated-classpath");
+ File f = new File(url.getPath());
+ BufferedReader reader = new BufferedReader(new FileReader(f));
+ String cp = reader.readLine();
+ cp = cp + ":" + classpathDir;
+ f = new File(classpathDir, "mrapp-generated-classpath");
+ Writer writer = new FileWriter(f);
+ writer.write(cp);
+ writer.close();
+ new File(classpathDir, "core-site.xml").delete();
+ }
+
+ @After
+ public void cleanUpMiniClusterSpecialConfig() throws Exception {
+ String classpathDir =
+ KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+ new File(classpathDir, "mrapp-generated-classpath").delete();
+ new File(classpathDir, "core-site.xml").delete();
+ String keystoresDir = new File(BASEDIR).getAbsolutePath();
+ KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, classpathDir);
+ }
+
+ private MiniDFSCluster dfsCluster = null;
+ private MiniMRClientCluster mrCluster = null;
+
+ private void startCluster(Configuration conf) throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ System.setProperty("hadoop.log.dir", "target/test-dir");
+ }
+ conf.set("dfs.block.access.token.enable", "false");
+ conf.set("dfs.permissions", "true");
+ conf.set("hadoop.security.authentication", "simple");
+ dfsCluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fileSystem = dfsCluster.getFileSystem();
+ fileSystem.mkdirs(new Path("/tmp"));
+ fileSystem.mkdirs(new Path("/user"));
+ fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
+ fileSystem.setPermission(
+ new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(
+ new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(
+ new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
+ FileSystem.setDefaultUri(conf, fileSystem.getUri());
+ mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
+
+ // so the minicluster conf is avail to the containers.
+ String classpathDir =
+ KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+ Writer writer = new FileWriter(classpathDir + "/core-site.xml");
+ mrCluster.getConfig().writeXml(writer);
+ writer.close();
+ }
+
+ private void stopCluster() throws Exception {
+ if (mrCluster != null) {
+ mrCluster.stop();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ protected JobConf getJobConf() throws IOException {
+ return new JobConf(mrCluster.getConfig());
+ }
+
+ private void encryptedShuffleWithCerts(boolean useClientCerts)
+ throws Exception {
+ try {
+ Configuration conf = new Configuration();
+ String keystoresDir = new File(BASEDIR).getAbsolutePath();
+ String sslConfsDir =
+ KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf,
+ useClientCerts);
+ conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
+ startCluster(conf);
+ FileSystem fs = FileSystem.get(getJobConf());
+ Path inputDir = new Path("input");
+ fs.mkdirs(inputDir);
+ Writer writer =
+ new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ writer.write("hello");
+ writer.close();
+
+ Path outputDir = new Path("output", "output");
+
+ JobConf jobConf = new JobConf(getJobConf());
+ jobConf.setInt("mapred.map.tasks", 1);
+ jobConf.setInt("mapred.map.max.attempts", 1);
+ jobConf.setInt("mapred.reduce.max.attempts", 1);
+ jobConf.set("mapred.input.dir", inputDir.toString());
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ JobClient jobClient = new JobClient(jobConf);
+ RunningJob runJob = jobClient.submitJob(jobConf);
+ runJob.waitForCompletion();
+ Assert.assertTrue(runJob.isComplete());
+ Assert.assertTrue(runJob.isSuccessful());
+ } finally {
+ stopCluster();
+ }
+ }
+
+ @Test
+ public void encryptedShuffleWithClientCerts() throws Exception {
+ encryptedShuffleWithCerts(true);
+ }
+
+ @Test
+ public void encryptedShuffleWithoutClientCerts() throws Exception {
+ encryptedShuffleWithCerts(false);
+ }
+
+}
+
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Jul 26 13:23:05 2012
@@ -55,7 +55,9 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -101,6 +103,8 @@ import org.jboss.netty.handler.codec.htt
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
@@ -114,6 +118,8 @@ public class ShuffleHandler extends Abst
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
+ private HttpPipelineFactory pipelineFact;
+ private int sslFileBufferSize;
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce.shuffle";
@@ -126,6 +132,11 @@ public class ShuffleHandler extends Abst
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 8080;
+ public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+ "mapreduce.shuffle.ssl.file.buffer.size";
+
+ public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@Metric("Shuffle output in bytes")
@@ -249,7 +260,11 @@ public class ShuffleHandler extends Abst
public synchronized void start() {
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
- HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+ try {
+ pipelineFact = new HttpPipelineFactory(conf);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
bootstrap.setPipelineFactory(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
Channel ch = bootstrap.bind(new InetSocketAddress(port));
@@ -259,6 +274,9 @@ public class ShuffleHandler extends Abst
pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
super.start();
+
+ sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
}
@Override
@@ -266,6 +284,7 @@ public class ShuffleHandler extends Abst
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.releaseExternalResources();
+ pipelineFact.destroy();
super.stop();
}
@@ -283,22 +302,38 @@ public class ShuffleHandler extends Abst
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
+ private SSLFactory sslFactory;
- public HttpPipelineFactory(Configuration conf) {
+ public HttpPipelineFactory(Configuration conf) throws Exception {
SHUFFLE = new Shuffle(conf);
+ if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+ }
+
+ public void destroy() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
}
@Override
public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(
- new HttpRequestDecoder(),
- new HttpChunkAggregator(1 << 16),
- new HttpResponseEncoder(),
- new ChunkedWriteHandler(),
- SHUFFLE);
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ return pipeline;
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
}
}
@@ -483,17 +518,25 @@ public class ShuffleHandler extends Abst
LOG.info(spillfile + " not found");
return null;
}
- final FileRegion partition = new DefaultFileRegion(
- spill.getChannel(), info.startOffset, info.partLength);
- ChannelFuture writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- partition.releaseExternalResources();
- }
- });
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) == null) {
+ final FileRegion partition = new DefaultFileRegion(
+ spill.getChannel(), info.startOffset, info.partLength);
+ writeFuture = ch.write(partition);
+ writeFuture.addListener(new ChannelFutureListener() {
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ partition.releaseExternalResources();
+ }
+ });
+ } else {
+ // HTTPS cannot be done with zero copy.
+ writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
+ info.partLength,
+ sslFileBufferSize));
+ }
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
return writeFuture;
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm?rev=1365979&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm Thu Jul 26 13:23:05 2012
@@ -0,0 +1,320 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~ http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+ ---
+ Hadoop Map Reduce Next Generation-${project.version} - Encrypted Shuffle
+ ---
+ ---
+ ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Encrypted Shuffle
+
+ \[ {{{./index.html}Go Back}} \]
+
+* {Introduction}
+
+ The Encrypted Shuffle capability allows encryption of the MapReduce shuffle
+ using HTTPS and with optional client authentication (also known as
+ bi-directional HTTPS, or HTTPS with client certificates). It comprises:
+
+ * A Hadoop configuration setting for toggling the shuffle between HTTP and
+ HTTPS.
+
+ * A Hadoop configuration settings for specifying the keystore and truststore
+ properties (location, type, passwords) used by the shuffle service and the
+ reducers tasks fetching shuffle data.
+
+ * A way to re-load truststores across the cluster (when a node is added or
+ removed).
+
+* {Configuration}
+
+** <<core-site.xml>> Properties
+
+ To enable encrypted shuffle, set the following properties in core-site.xml of
+ all nodes in the cluster:
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>> | <<Default Value>> | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.require.client.cert>>> | <<<false>>> | Whether client certificates are required |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.hostname.verifier>>> | <<<DEFAULT>>> | The hostname verifier to provide for HttpsURLConnections. Valid values are: <<DEFAULT>>, <<STRICT>>, <<STRICT_I6>>, <<DEFAULT_AND_LOCALHOST>> and <<ALLOW_ALL>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.keystores.factory.class>>> | <<<org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory>>> | The KeyStoresFactory implementation to use |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.server.conf>>> | <<<ss-server.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.client.conf>>> | <<<ss-client.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
+*--------------------------------------+---------------------+-----------------+
+
+ <<IMPORTANT:>> Currently requiring client certificates should be set to false.
+ Refer the {{{ClientCertificates}Client Certificates}} section for details.
+
+ <<IMPORTANT:>> All these properties should be marked as final in the cluster
+ configuration files.
+
+*** Example:
+
+------
+ ...
+ <property>
+ <name>hadoop.ssl.require.client.cert</name>
+ <value>false</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.ssl.hostname.verifier</name>
+ <value>DEFAULT</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.ssl.keystores.factory.class</name>
+ <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.ssl.server.conf</name>
+ <value>ssl-server.xml</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.ssl.client.conf</name>
+ <value>ssl-client.xml</value>
+ <final>true</final>
+ </property>
+ ...
+------
+
+** <<<mapred-site.xml>>> Properties
+
+ To enable encrypted shuffle, set the following property in mapred-site.xml
+ of all nodes in the cluster:
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>> | <<Default Value>> | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<mapreduce.shuffle.ssl.enabled>>> | <<<false>>> | Whether encrypted shuffle is enabled |
+*--------------------------------------+---------------------+-----------------+
+
+ <<IMPORTANT:>> This property should be marked as final in the cluster
+ configuration files.
+
+*** Example:
+
+------
+ ...
+ <property>
+ <name>mapreduce.shuffle.ssl.enabled</name>
+ <value>true</value>
+ <final>true</final>
+ </property>
+ ...
+------
+
+ The Linux container executor should be set to prevent job tasks from
+ reading the server keystore information and gaining access to the shuffle
+ server certificates.
+
+ Refer to Hadoop Kerberos configuration for details on how to do this.
+
+* {Keystore and Truststore Settings}
+
+ Currently <<<FileBasedKeyStoresFactory>>> is the only <<<KeyStoresFactory>>>
+ implementation. The <<<FileBasedKeyStoresFactory>>> implementation uses the
+ following properties, in the <<ssl-server.xml>> and <<ssl-client.xml>> files,
+ to configure the keystores and truststores.
+
+** <<<ssl-server.xml>>> (Shuffle server) Configuration:
+
+ The mapred user should own the <<ssl-server.xml>> file and have exclusive
+ read access to it.
+
+*---------------------------------------------+---------------------+-----------------+
+| <<Property>> | <<Default Value>> | <<Explanation>> |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.type>>> | <<<jks>>> | Keystore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and have exclusive read access to it. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.password>>> | NONE | Keystore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.type>>> | <<<jks>>> | Truststore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and have exclusive read access to it. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.password>>> | NONE | Truststore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds |
+*--------------------------------------+----------------------------+-----------------+
+
+*** Example:
+
+------
+<configuration>
+
+ <!-- Server Certificate Store -->
+ <property>
+ <name>ssl.server.keystore.type</name>
+ <value>jks</value>
+ </property>
+ <property>
+ <name>ssl.server.keystore.location</name>
+ <value>${user.home}/keystores/server-keystore.jks</value>
+ </property>
+ <property>
+ <name>ssl.server.keystore.password</name>
+ <value>serverfoo</value>
+ </property>
+
+ <!-- Server Trust Store -->
+ <property>
+ <name>ssl.server.truststore.type</name>
+ <value>jks</value>
+ </property>
+ <property>
+ <name>ssl.server.truststore.location</name>
+ <value>${user.home}/keystores/truststore.jks</value>
+ </property>
+ <property>
+ <name>ssl.server.truststore.password</name>
+ <value>clientserverbar</value>
+ </property>
+ <property>
+ <name>ssl.server.truststore.reload.interval</name>
+ <value>10000</value>
+ </property>
+</configuration>
+------
+
+** <<<ssl-client.xml>>> (Reducer/Fetcher) Configuration:
+
+ The mapred user should own the <<ssl-server.xml>> file and it should have
+ default permissions.
+
+*---------------------------------------------+---------------------+-----------------+
+| <<Property>> | <<Default Value>> | <<Explanation>> |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.type>>> | <<<jks>>> | Keystore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and it should have default permissions. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.password>>> | NONE | Keystore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.type>>> | <<<jks>>> | Truststore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and it should have default permissions. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.password>>> | NONE | Truststore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds |
+*--------------------------------------+----------------------------+-----------------+
+
+*** Example:
+
+------
+<configuration>
+
+ <!-- Client certificate Store -->
+ <property>
+ <name>ssl.client.keystore.type</name>
+ <value>jks</value>
+ </property>
+ <property>
+ <name>ssl.client.keystore.location</name>
+ <value>${user.home}/keystores/client-keystore.jks</value>
+ </property>
+ <property>
+ <name>ssl.client.keystore.password</name>
+ <value>clientfoo</value>
+ </property>
+
+ <!-- Client Trust Store -->
+ <property>
+ <name>ssl.client.truststore.type</name>
+ <value>jks</value>
+ </property>
+ <property>
+ <name>ssl.client.truststore.location</name>
+ <value>${user.home}/keystores/truststore.jks</value>
+ </property>
+ <property>
+ <name>ssl.client.truststore.password</name>
+ <value>clientserverbar</value>
+ </property>
+ <property>
+ <name>ssl.client.truststore.reload.interval</name>
+ <value>10000</value>
+ </property>
+</configuration>
+------
+
+* Activating Encrypted Shuffle
+
+ When you have made the above configuration changes, activate Encrypted
+ Shuffle by re-starting all NodeManagers.
+
+ <<IMPORTANT:>> Using encrypted shuffle will incur in a significant
+ performance impact. Users should profile this and potentially reserve
+ 1 or more cores for encrypted shuffle.
+
+* {ClientCertificates} Client Certificates
+
+ Using Client Certificates does not fully ensure that the client is a
+ reducer task for the job. Currently, Client Certificates (their private key)
+ keystore files must be readable by all users submitting jobs to the cluster.
+ This means that a rogue job could read such those keystore files and use
+ the client certificates in them to establish a secure connection with a
+ Shuffle server. However, unless the rogue job has a proper JobToken, it won't
+ be able to retrieve shuffle data from the Shuffle server. A job, using its
+ own JobToken, can only retrieve shuffle data that belongs to itself.
+
+* Reloading Truststores
+
+ By default the truststores will reload their configuration every 10 seconds.
+ If a new truststore file is copied over the old one, it will be re-read,
+ and its certificates will replace the old ones. This mechanism is useful for
+ adding or removing nodes from the cluster, or for adding or removing trusted
+ clients. In these cases, the client or NodeManager certificate is added to
+ (or removed from) all the truststore files in the system, and the new
+ configuration will be picked up without you having to restart the NodeManager
+ daemons.
+
+* Debugging
+
+ <<NOTE:>> Enable debugging only for troubleshooting, and then only for jobs
+ running on small amounts of data. It is very verbose and slows down jobs by
+ several orders of magnitude. (You might need to increase mapred.task.timeout
+ to prevent jobs from failing because tasks run so slowly.)
+
+ To enable SSL debugging in the reducers, set <<<-Djavax.net.debug=all>>> in
+ the <<<mapreduce.reduce.child.java.opts>>> property; for example:
+
+------
+ <property>
+ <name>mapred.reduce.child.java.opts</name>
+ <value>-Xmx-200m -Djavax.net.debug=all</value>
+ </property>
+------
+
+ You can do this on a per-job basis, or by means of a cluster-wide setting in
+ the <<<mapred-site.xml>>> file.
+
+ To set this property in NodeManager, set it in the <<<yarn-env.sh>>> file:
+
+------
+ YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all $YARN_NODEMANAGER_OPTS"
+------
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Thu Jul 26 13:23:05 2012
@@ -51,3 +51,5 @@ MapReduce NextGen aka YARN aka MRv2
* {{{./CLIMiniCluster.html}CLI MiniCluster}}
+ * {{{./EncryptedShuffle.html}Encrypted Shuffle}}
+