You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:34 UTC

[22/50] [abbrv] hadoop git commit: MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. Contributed by Zhihai Xu

MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. Contributed by Zhihai Xu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cbb24953
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cbb24953
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cbb24953

Branch: refs/heads/HDFS-7285
Commit: cbb249534aa72ff6c290c4f99766415aeea9d6f5
Parents: b6ceee9
Author: Zhihai Xu <zx...@apache.org>
Authored: Fri Aug 28 12:13:23 2015 -0700
Committer: Zhihai Xu <zx...@apache.org>
Committed: Fri Aug 28 12:13:23 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  7 +++++
 .../apache/hadoop/mapred/LocalJobRunner.java    | 27 ++++++++++++++++++++
 .../hadoop/mapred/TestLocalJobSubmission.java   | 25 ++++++++++++++++++
 3 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cbb24953/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 361a19b..27af9f9 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -559,6 +559,13 @@ Release 2.8.0 - UNRELEASED
     committing is not utilized when input path is absolute.
     (Dustin Cote via aajisaka)
 
+    MAPREDUCE-6357. MultipleOutputs.write() API should document that output
+    committing is not utilized when input path is absolute.
+    (Dustin Cote via aajisaka)
+
+    MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner.
+    (Zhihai Xu)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cbb24953/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index b685502..45d3cc5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,6 +37,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.crypto.KeyGenerator;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -47,7 +50,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
@@ -55,6 +60,7 @@ import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -84,6 +90,8 @@ public class LocalJobRunner implements ClientProtocol {
   public static final String LOCAL_MAX_REDUCES =
     "mapreduce.local.reduce.tasks.maximum";
 
+  public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
+
   private FileSystem fs;
   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
   private JobConf conf;
@@ -188,6 +196,25 @@ public class LocalJobRunner implements ClientProtocol {
 
       jobs.put(id, this);
 
+      if (CryptoUtils.isEncryptedSpillEnabled(job)) {
+        try {
+          int keyLen = conf.getInt(
+              MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
+              MRJobConfig
+                  .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
+          KeyGenerator keyGen =
+              KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
+          keyGen.init(keyLen);
+          Credentials creds =
+              UserGroupInformation.getCurrentUser().getCredentials();
+          TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(),
+              creds);
+          UserGroupInformation.getCurrentUser().addCredentials(creds);
+        } catch (NoSuchAlgorithmException e) {
+          throw new IOException("Error generating encrypted spill key", e);
+        }
+      }
+
       this.start();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cbb24953/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
index d73ee4b..8b02857 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.After;
@@ -81,6 +82,30 @@ public class TestLocalJobSubmission {
     assertEquals("dist job res is not 0:", 0, res);
   }
 
+  /**
+   * test the local job submission with
+   * intermediate data encryption enabled.
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobEncryptedIntermediateData() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    final String[] args = {
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+    };
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with " + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0:", 0, res);
+  }
+
   private Path makeJar(Path p) throws IOException {
     FileOutputStream fos = new FileOutputStream(new File(p.toString()));
     JarOutputStream jos = new JarOutputStream(fos);