You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/23 00:35:55 UTC

svn commit: r966884 [10/10] - in /hadoop/mapreduce/trunk: ./ src/c++/pipes/ src/c++/pipes/impl/ src/c++/utils/ src/c++/utils/m4/ src/examples/pipes/ src/examples/pipes/conf/ src/java/org/apache/hadoop/mapred/pipes/

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Jul 22 22:35:55 2010
@@ -26,12 +26,18 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -42,6 +48,11 @@ import org.apache.hadoop.mapred.RecordRe
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -83,6 +94,18 @@ class Application<K1 extends WritableCom
     env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
     env.put(Submitter.PORT, 
             Integer.toString(serverSocket.getLocalPort()));
+    
+    //Add token to the environment if security is enabled
+    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
+        .getCredentials());
+    // This password is used as shared secret key between this application and
+    // child pipes process
+    byte[]  password = jobToken.getPassword();
+    String localPasswordFile = conf.getJobLocalDir() + Path.SEPARATOR
+        + "jobTokenPassword";
+    writePasswordToLocalFile(localPasswordFile, password, conf);
+    env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
+ 
     List<String> cmd = new ArrayList<String>();
     String interpretor = conf.get(Submitter.INTERPRETOR);
     if (interpretor != null) {
@@ -109,17 +132,52 @@ class Application<K1 extends WritableCom
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
-    handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
+    
+    String challenge = getSecurityChallenge();
+    String digestToSend = createDigest(password, challenge);
+    String digestExpected = createDigest(password, digestToSend);
+    
+    handler = new OutputHandler<K2, V2>(output, reporter, recordReader, 
+        digestExpected);
     K2 outputKey = (K2)
       ReflectionUtils.newInstance(outputKeyClass, conf);
     V2 outputValue = (V2) 
       ReflectionUtils.newInstance(outputValueClass, conf);
     downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                   outputKey, outputValue, conf);
+    
+    downlink.authenticate(digestToSend, challenge);
+    waitForAuthentication();
+    LOG.debug("Authentication succeeded");
     downlink.start();
     downlink.setJobConf(conf);
   }
 
+  private String getSecurityChallenge() {
+    Random rand = new Random(System.currentTimeMillis());
+    //Use 4 random integers so as to have 16 random bytes.
+    StringBuilder strBuilder = new StringBuilder();
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    strBuilder.append(rand.nextInt(0x7fffffff));
+    return strBuilder.toString();
+  }
+
+  private void writePasswordToLocalFile(String localPasswordFile,
+      byte[] password, JobConf conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path localPath = new Path(localPasswordFile);
+    if (localFs.isFile(localPath)) {
+      LOG.debug("Password file is already created by previous path");
+      return;
+    }
+    FSDataOutputStream out = FileSystem.create(localFs, localPath,
+        new FsPermission("400"));
+    out.write(password);
+    out.close();
+  }
+
   /**
    * Get the downward protocol object that can send commands down to the
    * application.
@@ -128,7 +186,19 @@ class Application<K1 extends WritableCom
   DownwardProtocol<K1, V1> getDownlink() {
     return downlink;
   }
-
+  
+  /**
+   * Wait for authentication response.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void waitForAuthentication() throws IOException,
+      InterruptedException {
+    downlink.flush();
+    LOG.debug("Waiting for authentication response");
+    handler.waitForAuthentication();
+  }
+  
   /**
    * Wait for the application to finish
    * @return did the application finish correctly?
@@ -192,5 +262,11 @@ class Application<K1 extends WritableCom
     Process result = builder.start();
     return result;
   }
+  
+  public static String createDigest(byte[] password, String data)
+      throws IOException {
+    SecretKey key = JobTokenSecretManager.createSecretKey(password);
+    return SecureShuffleUtils.hashFromString(data, key);
+  }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Jul 22 22:35:55 2010
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
@@ -34,6 +36,8 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -69,13 +73,15 @@ class BinaryProtocol<K1 extends Writable
                                     REDUCE_VALUE(7),
                                     CLOSE(8),
                                     ABORT(9),
+                                    AUTHENTICATION_REQ(10),
                                     OUTPUT(50),
                                     PARTITIONED_OUTPUT(51),
                                     STATUS(52),
                                     PROGRESS(53),
                                     DONE(54),
                                     REGISTER_COUNTER(55),
-                                    INCREMENT_COUNTER(56);
+                                    INCREMENT_COUNTER(56),
+                                    AUTHENTICATION_RESP(57);
     final int code;
     MessageType(int code) {
       this.code = code;
@@ -90,6 +96,7 @@ class BinaryProtocol<K1 extends Writable
     private UpwardProtocol<K2, V2> handler;
     private K2 key;
     private V2 value;
+    private boolean authPending = true;
     
     public UplinkReaderThread(InputStream stream,
                               UpwardProtocol<K2, V2> handler, 
@@ -113,7 +120,14 @@ class BinaryProtocol<K1 extends Writable
           }
           int cmd = WritableUtils.readVInt(inStream);
           LOG.debug("Handling uplink command " + cmd);
-          if (cmd == MessageType.OUTPUT.code) {
+          if (cmd == MessageType.AUTHENTICATION_RESP.code) {
+            String digest = Text.readString(inStream);
+            authPending = !handler.authenticate(digest);
+          } else if (authPending) {
+            LOG.warn("Message " + cmd + " received before authentication is "
+                + "complete. Ignoring");
+            continue;
+          } else if (cmd == MessageType.OUTPUT.code) {
             readObject(key);
             readObject(value);
             handler.output(key, value);
@@ -244,6 +258,15 @@ class BinaryProtocol<K1 extends Writable
     uplink.interrupt();
     uplink.join();
   }
+  
+  public void authenticate(String digest, String challenge)
+      throws IOException {
+    LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge="
+        + challenge);
+    WritableUtils.writeVInt(stream, MessageType.AUTHENTICATION_REQ.code);
+    Text.writeString(stream, digest);
+    Text.writeString(stream, challenge);
+  }
 
   public void start() throws IOException {
     LOG.debug("starting downlink");
@@ -344,5 +367,4 @@ class BinaryProtocol<K1 extends Writable
       stream.write(buffer.getData(), 0, length);
     }
   }
-  
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Jul 22 22:35:55 2010
@@ -32,6 +32,12 @@ import org.apache.hadoop.mapred.JobConf;
  */
 interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
   /**
+   * request authentication
+   * @throws IOException
+   */
+  void authenticate(String digest, String challenge) throws IOException;
+  
+  /**
    * Start communication
    * @throws IOException
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Thu Jul 22 22:35:55 2010
@@ -44,21 +44,26 @@ class OutputHandler<K extends WritableCo
   private OutputCollector<K, V> collector;
   private float progressValue = 0.0f;
   private boolean done = false;
+  
   private Throwable exception = null;
   RecordReader<FloatWritable,NullWritable> recordReader = null;
   private Map<Integer, Counters.Counter> registeredCounters = 
     new HashMap<Integer, Counters.Counter>();
 
+  private String expectedDigest = null;
+  private boolean digestReceived = false;
   /**
    * Create a handler that will handle any records output from the application.
    * @param collector the "real" collector that takes the output
    * @param reporter the reporter for reporting progress
    */
   public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
-                       RecordReader<FloatWritable,NullWritable> recordReader) {
+                       RecordReader<FloatWritable,NullWritable> recordReader,
+                       String expectedDigest) {
     this.reporter = reporter;
     this.collector = collector;
     this.recordReader = recordReader;
+    this.expectedDigest = expectedDigest;
   }
 
   /**
@@ -155,5 +160,32 @@ class OutputHandler<K extends WritableCo
       throw new IOException("Invalid counter with id: " + id);
     }
   }
+  
+  public synchronized boolean authenticate(String digest) throws IOException {
+    boolean success = true;
+    if (!expectedDigest.equals(digest)) {
+      exception = new IOException("Authentication Failed: Expected digest="
+          + expectedDigest + ", received=" + digestReceived);
+      success = false;
+    }
+    digestReceived = true;
+    notify();
+    return success;
+  }
 
+  /**
+   * This is called by Application and blocks the thread until
+   * authentication response is received.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  synchronized void waitForAuthentication()
+      throws IOException, InterruptedException {
+    while (digestReceived == false && exception == null) {
+      wait();
+    }
+    if (exception != null) {
+      throw new IOException(exception.getMessage());
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Thu Jul 22 22:35:55 2010
@@ -88,4 +88,14 @@ interface UpwardProtocol<K extends Writa
    * @throws IOException
    */
   void incrementCounter(int id, long amount) throws IOException;
+
+  /**
+   * Handles authentication response from client.
+   * It must notify the threads waiting for authentication response.
+   * @param digest
+   * @return true if authentication is successful
+   * @throws IOException
+   */
+  boolean authenticate(String digest) throws IOException;
+
 }