You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/26 15:23:07 UTC

svn commit: r1365979 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-core...

Author: tucu
Date: Thu Jul 26 13:23:05 2012
New Revision: 1365979

URL: http://svn.apache.org/viewvc?rev=1365979&view=rev
Log:
MAPREDUCE-4417. add support for encrypted shuffle (tucu)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jul 26 13:23:05 2012
@@ -135,6 +135,8 @@ Branch-2 ( Unreleased changes )
     MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process 
     command-line. (ahmed via tucu)
 
+    MAPREDUCE-4417. add support for encrypted shuffle (tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved

Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jul 26 13:23:05 2012
@@ -473,5 +473,10 @@
    <!--
      The above 2 fields are accessed locally and only via methods that are synchronized. 
      -->
-  
+ 
+   <Match>
+     <Class name="org.apache.hadoop.mapred.ShuffleHandler" />
+      <Field name="sslFileBufferSize" />
+     <Bug pattern="IS2_INCONSISTENT_SYNC" />
+   </Match> 
  </FindBugsFilter>

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Jul 26 13:23:05 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -108,7 +110,8 @@ public abstract class TaskImpl implement
   private long scheduledTime;
   
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
+
+  protected boolean encryptedShuffle;
   protected Credentials credentials;
   protected Token<JobTokenIdentifier> jobToken;
   
@@ -274,6 +277,8 @@ public abstract class TaskImpl implement
     this.jobToken = jobToken;
     this.metrics = metrics;
     this.appContext = appContext;
+    this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                                            MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
 
     // See if this is from a previous generation.
     if (completedTasksFromPreviousRun != null
@@ -637,9 +642,10 @@ public abstract class TaskImpl implement
       TaskAttemptCompletionEvent tce = recordFactory
           .newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
-      tce.setMapOutputServerAddress("http://"
-          + attempt.getNodeHttpAddress().split(":")[0] + ":"
-          + attempt.getShufflePort());
+      String scheme = (encryptedShuffle) ? "https://" : "http://";
+      tce.setMapOutputServerAddress(scheme
+         + attempt.getNodeHttpAddress().split(":")[0] + ":"
+         + attempt.getShufflePort());
       tce.setStatus(status);
       tce.setAttemptId(attempt.getID());
       int runTime = 0;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Jul 26 13:23:05 2012
@@ -79,4 +79,9 @@ public interface MRConfig {
   public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
   public static final String MAX_BLOCK_LOCATIONS_KEY =
     "mapreduce.job.max.split.locations";
+
+  public static final String SHUFFLE_SSL_ENABLED_KEY =
+    "mapreduce.shuffle.ssl.enabled";
+
+  public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 26 13:23:05 2012
@@ -25,11 +25,13 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.HttpURLConnection;
 import java.net.URLConnection;
+import java.security.GeneralSecurityException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,9 +44,11 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.IFileInputStream;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -92,6 +96,9 @@ class Fetcher<K,V> extends Thread {
 
   private volatile boolean stopped = false;
 
+  private static boolean sslShuffle;
+  private static SSLFactory sslFactory;
+
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
@@ -135,6 +142,20 @@ class Fetcher<K,V> extends Thread {
     
     setName("fetcher#" + id);
     setDaemon(true);
+
+    synchronized (Fetcher.class) {
+      sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                                  MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+      if (sslShuffle && sslFactory == null) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+        try {
+          sslFactory.init();
+        } catch (Exception ex) {
+          sslFactory.destroy();
+          throw new RuntimeException(ex);
+        }
+      }
+    }
   }
   
   public void run() {
@@ -173,8 +194,25 @@ class Fetcher<K,V> extends Thread {
     } catch (InterruptedException ie) {
       LOG.warn("Got interrupt while joining " + getName(), ie);
     }
+    if (sslFactory != null) {
+      sslFactory.destroy();
+    }
   }
 
+  protected HttpURLConnection openConnection(URL url) throws IOException {
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    if (sslShuffle) {
+      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+      try {
+        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+      }
+      return conn;
+    }
+    
   /**
    * The crux of the matter...
    * 
@@ -205,7 +243,7 @@ class Fetcher<K,V> extends Thread {
     
     try {
       URL url = getMapOutputURL(host, maps);
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+      HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jul 26 13:23:05 2012
@@ -513,6 +513,21 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.ssl.enabled</name>
+  <value>false</value>
+  <description>
+    Whether to use SSL for for the Shuffle HTTP endpoints.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.ssl.file.buffer.size</name>
+  <value>65536</value>
+  <description>Buffer size for reading spills from file when using SSL.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java?rev=1365979&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java Thu Jul 26 13:23:05 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security.ssl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapred.RunningJob;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URL;
+
+public class TestEncryptedShuffle {
+
+  private static final String BASEDIR =
+    System.getProperty("test.build.dir", "target/test-dir") + "/" +
+    TestEncryptedShuffle.class.getSimpleName();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    File base = new File(BASEDIR);
+    FileUtil.fullyDelete(base);
+    base.mkdirs();
+  }
+
+  @Before
+  public void createCustomYarnClasspath() throws Exception {
+    String classpathDir =
+      KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+
+    URL url = Thread.currentThread().getContextClassLoader().
+      getResource("mrapp-generated-classpath");
+    File f = new File(url.getPath());
+    BufferedReader reader = new BufferedReader(new FileReader(f));
+    String cp = reader.readLine();
+    cp = cp + ":" + classpathDir;
+    f = new File(classpathDir, "mrapp-generated-classpath");
+    Writer writer = new FileWriter(f);
+    writer.write(cp);
+    writer.close();
+    new File(classpathDir, "core-site.xml").delete();
+  }
+
+  @After
+  public void cleanUpMiniClusterSpecialConfig() throws Exception {
+    String classpathDir =
+      KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+    new File(classpathDir, "mrapp-generated-classpath").delete();
+    new File(classpathDir, "core-site.xml").delete();
+    String keystoresDir = new File(BASEDIR).getAbsolutePath();
+    KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, classpathDir);
+  }
+
+  private MiniDFSCluster dfsCluster = null;
+  private MiniMRClientCluster mrCluster = null;
+
+  private void startCluster(Configuration  conf) throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      System.setProperty("hadoop.log.dir", "target/test-dir");
+    }
+    conf.set("dfs.block.access.token.enable", "false");
+    conf.set("dfs.permissions", "true");
+    conf.set("hadoop.security.authentication", "simple");
+    dfsCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSystem = dfsCluster.getFileSystem();
+    fileSystem.mkdirs(new Path("/tmp"));
+    fileSystem.mkdirs(new Path("/user"));
+    fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
+    fileSystem.setPermission(
+      new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
+    fileSystem.setPermission(
+      new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
+    fileSystem.setPermission(
+      new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
+    FileSystem.setDefaultUri(conf, fileSystem.getUri());
+    mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
+
+    // so the minicluster conf is avail to the containers.
+    String classpathDir =
+      KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+    Writer writer = new FileWriter(classpathDir + "/core-site.xml");
+    mrCluster.getConfig().writeXml(writer);
+    writer.close();
+  }
+
+  private void stopCluster() throws Exception {
+    if (mrCluster != null) {
+      mrCluster.stop();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  protected JobConf getJobConf() throws IOException {
+    return new JobConf(mrCluster.getConfig());
+  }
+
+  private void encryptedShuffleWithCerts(boolean useClientCerts)
+    throws Exception {
+    try {
+      Configuration conf = new Configuration();
+      String keystoresDir = new File(BASEDIR).getAbsolutePath();
+      String sslConfsDir =
+        KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
+      KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf,
+                                      useClientCerts);
+      conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
+      startCluster(conf);
+      FileSystem fs = FileSystem.get(getJobConf());
+      Path inputDir = new Path("input");
+      fs.mkdirs(inputDir);
+      Writer writer =
+        new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+      writer.write("hello");
+      writer.close();
+
+      Path outputDir = new Path("output", "output");
+
+      JobConf jobConf = new JobConf(getJobConf());
+      jobConf.setInt("mapred.map.tasks", 1);
+      jobConf.setInt("mapred.map.max.attempts", 1);
+      jobConf.setInt("mapred.reduce.max.attempts", 1);
+      jobConf.set("mapred.input.dir", inputDir.toString());
+      jobConf.set("mapred.output.dir", outputDir.toString());
+      JobClient jobClient = new JobClient(jobConf);
+      RunningJob runJob = jobClient.submitJob(jobConf);
+      runJob.waitForCompletion();
+      Assert.assertTrue(runJob.isComplete());
+      Assert.assertTrue(runJob.isSuccessful());
+    } finally {
+      stopCluster();
+    }
+  }
+
+  @Test
+  public void encryptedShuffleWithClientCerts() throws Exception {
+    encryptedShuffleWithCerts(true);
+  }
+
+  @Test
+  public void encryptedShuffleWithoutClientCerts() throws Exception {
+    encryptedShuffleWithCerts(false);
+  }
+
+}
+

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Jul 26 13:23:05 2012
@@ -55,7 +55,9 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -101,6 +103,8 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
@@ -114,6 +118,8 @@ public class ShuffleHandler extends Abst
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
+  private HttpPipelineFactory pipelineFact;
+  private int sslFileBufferSize;
 
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
@@ -126,6 +132,11 @@ public class ShuffleHandler extends Abst
   public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 8080;
 
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "mapreduce.shuffle.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
     @Metric("Shuffle output in bytes")
@@ -249,7 +260,11 @@ public class ShuffleHandler extends Abst
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
     bootstrap.setPipelineFactory(pipelineFact);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     Channel ch = bootstrap.bind(new InetSocketAddress(port));
@@ -259,6 +274,9 @@ public class ShuffleHandler extends Abst
     pipelineFact.SHUFFLE.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.start();
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
   }
 
   @Override
@@ -266,6 +284,7 @@ public class ShuffleHandler extends Abst
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
     bootstrap.releaseExternalResources();
+    pipelineFact.destroy();
     super.stop();
   }
 
@@ -283,22 +302,38 @@ public class ShuffleHandler extends Abst
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
+    private SSLFactory sslFactory;
 
-    public HttpPipelineFactory(Configuration conf) {
+    public HttpPipelineFactory(Configuration conf) throws Exception {
       SHUFFLE = new Shuffle(conf);
+      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
     }
 
     @Override
     public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(
-            new HttpRequestDecoder(),
-            new HttpChunkAggregator(1 << 16),
-            new HttpResponseEncoder(),
-            new ChunkedWriteHandler(),
-            SHUFFLE);
-        // TODO factor security manager into pipeline
-        // TODO factor out encode/decode to permit binary shuffle
-        // TODO factor out decode of index to permit alt. models
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", SHUFFLE);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
     }
 
   }
@@ -483,17 +518,25 @@ public class ShuffleHandler extends Abst
         LOG.info(spillfile + " not found");
         return null;
       }
-      final FileRegion partition = new DefaultFileRegion(
-          spill.getChannel(), info.startOffset, info.partLength);
-      ChannelFuture writeFuture = ch.write(partition);
-      writeFuture.addListener(new ChannelFutureListener() {
-          // TODO error handling; distinguish IO/connection failures,
-          //      attribute to appropriate spill output
-          @Override
-          public void operationComplete(ChannelFuture future) {
-            partition.releaseExternalResources();
-          }
-        });
+      ChannelFuture writeFuture;
+      if (ch.getPipeline().get(SslHandler.class) == null) {
+        final FileRegion partition = new DefaultFileRegion(
+            spill.getChannel(), info.startOffset, info.partLength);
+        writeFuture = ch.write(partition);
+        writeFuture.addListener(new ChannelFutureListener() {
+            // TODO error handling; distinguish IO/connection failures,
+            //      attribute to appropriate spill output
+            @Override
+            public void operationComplete(ChannelFuture future) {
+              partition.releaseExternalResources();
+            }
+          });
+      } else {
+        // HTTPS cannot be done with zero copy.
+        writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
+                                               info.partLength,
+                                               sslFileBufferSize));
+      }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
       return writeFuture;

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm?rev=1365979&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm Thu Jul 26 13:23:05 2012
@@ -0,0 +1,320 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop Map Reduce Next Generation-${project.version} - Encrypted Shuffle
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Encrypted Shuffle
+
+  \[ {{{./index.html}Go Back}} \]
+
+* {Introduction}
+
+  The Encrypted Shuffle capability allows encryption of the MapReduce shuffle
+  using HTTPS and with optional client authentication (also known as
+  bi-directional HTTPS, or HTTPS with client certificates). It comprises:
+
+  * A Hadoop configuration setting for toggling the shuffle between HTTP and
+    HTTPS.
+
+  * A Hadoop configuration settings for specifying the keystore and truststore
+   properties (location, type, passwords) used by the shuffle service and the
+   reducers tasks fetching shuffle data.
+
+  * A way to re-load truststores across the cluster (when a node is added or
+    removed).
+
+* {Configuration}
+
+**  <<core-site.xml>> Properties
+
+  To enable encrypted shuffle, set the following properties in core-site.xml of
+  all nodes in the cluster:
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>>                         | <<Default Value>>   | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.require.client.cert>>> | <<<false>>>         | Whether client certificates are required |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.hostname.verifier>>>   | <<<DEFAULT>>>       | The hostname verifier to provide for HttpsURLConnections. Valid values are: <<DEFAULT>>, <<STRICT>>, <<STRICT_I6>>, <<DEFAULT_AND_LOCALHOST>> and <<ALLOW_ALL>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.keystores.factory.class>>> | <<<org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory>>> | The KeyStoresFactory implementation to use |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.server.conf>>>         | <<<ss-server.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
+*--------------------------------------+---------------------+-----------------+
+| <<<hadoop.ssl.client.conf>>>         | <<<ss-client.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
+*--------------------------------------+---------------------+-----------------+
+
+  <<IMPORTANT:>> Currently requiring client certificates should be set to false.
+  Refer the {{{ClientCertificates}Client Certificates}} section for details.
+
+  <<IMPORTANT:>> All these properties should be marked as final in the cluster
+  configuration files.
+
+*** Example:
+
+------
+    ...
+    <property>
+      <name>hadoop.ssl.require.client.cert</name>
+      <value>false</value>
+      <final>true</final>
+    </property>
+
+    <property>
+      <name>hadoop.ssl.hostname.verifier</name>
+      <value>DEFAULT</value>
+      <final>true</final>
+    </property>
+
+    <property>
+      <name>hadoop.ssl.keystores.factory.class</name>
+      <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
+      <final>true</final>
+    </property>
+
+    <property>
+      <name>hadoop.ssl.server.conf</name>
+      <value>ssl-server.xml</value>
+      <final>true</final>
+    </property>
+
+    <property>
+      <name>hadoop.ssl.client.conf</name>
+      <value>ssl-client.xml</value>
+      <final>true</final>
+    </property>
+    ...
+------
+
+**  <<<mapred-site.xml>>> Properties
+
+  To enable encrypted shuffle, set the following property in mapred-site.xml
+  of all nodes in the cluster:
+
+*--------------------------------------+---------------------+-----------------+
+| <<Property>>                         | <<Default Value>>   | <<Explanation>> |
+*--------------------------------------+---------------------+-----------------+
+| <<<mapreduce.shuffle.ssl.enabled>>>  | <<<false>>>         | Whether encrypted shuffle is enabled |
+*--------------------------------------+---------------------+-----------------+
+
+  <<IMPORTANT:>> This property should be marked as final in the cluster
+  configuration files.
+
+*** Example:
+
+------
+    ...
+    <property>
+      <name>mapreduce.shuffle.ssl.enabled</name>
+      <value>true</value>
+      <final>true</final>
+    </property>
+    ...
+------
+
+  The Linux container executor should be set to prevent job tasks from
+  reading the server keystore information and gaining access to the shuffle
+  server certificates.
+
+  Refer to Hadoop Kerberos configuration for details on how to do this.
+
+* {Keystore and Truststore Settings}
+
+  Currently <<<FileBasedKeyStoresFactory>>> is the only <<<KeyStoresFactory>>>
+  implementation. The <<<FileBasedKeyStoresFactory>>> implementation uses the
+  following properties, in the <<ssl-server.xml>> and <<ssl-client.xml>> files,
+  to configure the keystores and truststores.
+
+** <<<ssl-server.xml>>> (Shuffle server) Configuration:
+
+  The mapred user should own the <<ssl-server.xml>> file and have exclusive
+  read access to it.
+
+*---------------------------------------------+---------------------+-----------------+
+| <<Property>>                                | <<Default Value>>   | <<Explanation>> |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.type>>>              | <<<jks>>>           | Keystore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.location>>>          | NONE                | Keystore file location. The mapred user should own this file and have exclusive read access to it. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.keystore.password>>>          | NONE                | Keystore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.type>>>            | <<<jks>>>           | Truststore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.location>>>        | NONE                | Truststore file location. The mapred user should own this file and have exclusive read access to it. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.password>>>        | NONE                | Truststore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.server.truststore.reload.interval>>> | 10000               | Truststore reload interval, in milliseconds |
+*--------------------------------------+----------------------------+-----------------+
+
+*** Example:
+
+------
+<configuration>
+
+  <!-- Server Certificate Store -->
+  <property>
+    <name>ssl.server.keystore.type</name>
+    <value>jks</value>
+  </property>
+  <property>
+    <name>ssl.server.keystore.location</name>
+    <value>${user.home}/keystores/server-keystore.jks</value>
+  </property>
+  <property>
+    <name>ssl.server.keystore.password</name>
+    <value>serverfoo</value>
+  </property>
+
+  <!-- Server Trust Store -->
+  <property>
+    <name>ssl.server.truststore.type</name>
+    <value>jks</value>
+  </property>
+  <property>
+    <name>ssl.server.truststore.location</name>
+    <value>${user.home}/keystores/truststore.jks</value>
+  </property>
+  <property>
+    <name>ssl.server.truststore.password</name>
+    <value>clientserverbar</value>
+  </property>
+  <property>
+    <name>ssl.server.truststore.reload.interval</name>
+    <value>10000</value>
+  </property>
+</configuration>
+------
+
+** <<<ssl-client.xml>>> (Reducer/Fetcher) Configuration:
+
+  The mapred user should own the <<ssl-server.xml>> file and it should have
+  default permissions.
+
+*---------------------------------------------+---------------------+-----------------+
+| <<Property>>                                | <<Default Value>>   | <<Explanation>> |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.type>>>              | <<<jks>>>           | Keystore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.location>>>          | NONE                | Keystore file location. The mapred user should own this file and it should have default permissions. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.keystore.password>>>          | NONE                | Keystore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.type>>>            | <<<jks>>>           | Truststore file type |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.location>>>        | NONE                | Truststore file location. The mapred user should own this file and it should have default permissions. |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.password>>>        | NONE                | Truststore file password |
+*---------------------------------------------+---------------------+-----------------+
+| <<<ssl.client.truststore.reload.interval>>> | 10000                | Truststore reload interval, in milliseconds |
+*--------------------------------------+----------------------------+-----------------+
+
+*** Example:
+
+------
+<configuration>
+
+  <!-- Client certificate Store -->
+  <property>
+    <name>ssl.client.keystore.type</name>
+    <value>jks</value>
+  </property>
+  <property>
+    <name>ssl.client.keystore.location</name>
+    <value>${user.home}/keystores/client-keystore.jks</value>
+  </property>
+  <property>
+    <name>ssl.client.keystore.password</name>
+    <value>clientfoo</value>
+  </property>
+
+  <!-- Client Trust Store -->
+  <property>
+    <name>ssl.client.truststore.type</name>
+    <value>jks</value>
+  </property>
+  <property>
+    <name>ssl.client.truststore.location</name>
+    <value>${user.home}/keystores/truststore.jks</value>
+  </property>
+  <property>
+    <name>ssl.client.truststore.password</name>
+    <value>clientserverbar</value>
+  </property>
+  <property>
+    <name>ssl.client.truststore.reload.interval</name>
+    <value>10000</value>
+  </property>
+</configuration>
+------
+
+* Activating Encrypted Shuffle
+
+  When you have made the above configuration changes, activate Encrypted
+  Shuffle by re-starting all NodeManagers.
+
+  <<IMPORTANT:>> Using encrypted shuffle will incur in a significant
+  performance impact. Users should profile this and potentially reserve
+  1 or more cores for encrypted shuffle.
+
+* {ClientCertificates} Client Certificates
+
+  Using Client Certificates does not fully ensure that the client is a
+  reducer task for the job. Currently, Client Certificates (their private key)
+  keystore files must be readable by all users submitting jobs to the cluster.
+  This means that a rogue job could read such those keystore files and use
+  the client certificates in them to establish a secure connection with a
+  Shuffle server. However, unless the rogue job has a proper JobToken, it won't
+  be able to retrieve shuffle data from the Shuffle server. A job, using its
+  own JobToken, can only retrieve shuffle data that belongs to itself.
+
+* Reloading Truststores
+
+  By default the truststores will reload their configuration every 10 seconds.
+  If a new truststore file is copied over the old one, it will be re-read,
+  and its certificates will replace the old ones. This mechanism is useful for
+  adding or removing nodes from the cluster, or for adding or removing trusted
+  clients. In these cases, the client or NodeManager certificate is added to
+  (or removed from) all the truststore files in the system, and the new
+  configuration will be picked up without you having to restart the NodeManager
+  daemons.
+
+* Debugging
+
+  <<NOTE:>> Enable debugging only for troubleshooting, and then only for jobs
+  running on small amounts of data. It is very verbose and slows down jobs by
+  several orders of magnitude. (You might need to increase mapred.task.timeout
+  to prevent jobs from failing because tasks run so slowly.)
+
+  To enable SSL debugging in the reducers, set <<<-Djavax.net.debug=all>>> in
+  the <<<mapreduce.reduce.child.java.opts>>> property; for example:
+
+------
+  <property>
+    <name>mapred.reduce.child.java.opts</name>
+    <value>-Xmx-200m -Djavax.net.debug=all</value>
+  </property>
+------
+
+  You can do this on a per-job basis, or by means of a cluster-wide setting in
+  the <<<mapred-site.xml>>> file.
+
+  To set this property in NodeManager, set it in the <<<yarn-env.sh>>> file:
+
+------
+  YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all $YARN_NODEMANAGER_OPTS"
+------

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm?rev=1365979&r1=1365978&r2=1365979&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Thu Jul 26 13:23:05 2012
@@ -51,3 +51,5 @@ MapReduce NextGen aka YARN aka MRv2
 
   * {{{./CLIMiniCluster.html}CLI MiniCluster}}
 
+  * {{{./EncryptedShuffle.html}Encrypted Shuffle}}
+