You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:16:54 UTC
svn commit: r1077456 [10/10] - in
/hadoop/common/branches/branch-0.20-security-patches/src: c++/pipes/
c++/pipes/impl/ c++/utils/ c++/utils/m4/ examples/pipes/
mapred/org/apache/hadoop/mapred/pipes/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1077456&r1=1077455&r2=1077456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar 4 04:16:53 2011
@@ -26,11 +26,18 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+
+import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -41,6 +48,11 @@ import org.apache.hadoop.mapred.RecordRe
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -82,6 +94,18 @@ class Application<K1 extends WritableCom
env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
env.put("hadoop.pipes.command.port",
Integer.toString(serverSocket.getLocalPort()));
+
+ //Add token to the environment if security is enabled
+ Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
+ .getCredentials());
+ // This password is used as shared secret key between this application and
+ // child pipes process
+ byte[] password = jobToken.getPassword();
+ String localPasswordFile = conf.getJobLocalDir() + Path.SEPARATOR
+ + "jobTokenPassword";
+ writePasswordToLocalFile(localPasswordFile, password, conf);
+ env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
+
List<String> cmd = new ArrayList<String>();
String interpretor = conf.get("hadoop.pipes.executable.interpretor");
if (interpretor != null) {
@@ -107,17 +131,52 @@ class Application<K1 extends WritableCom
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
- handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
+
+ String challenge = getSecurityChallenge();
+ String digestToSend = createDigest(password, challenge);
+ String digestExpected = createDigest(password, digestToSend);
+
+ handler = new OutputHandler<K2, V2>(output, reporter, recordReader,
+ digestExpected);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
+
+ downlink.authenticate(digestToSend, challenge);
+ waitForAuthentication();
+ LOG.debug("Authentication succeeded");
downlink.start();
downlink.setJobConf(conf);
}
+ private String getSecurityChallenge() {
+ Random rand = new Random(System.currentTimeMillis());
+ //Use 4 random integers so as to have 16 random bytes.
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append(rand.nextInt(0x7fffffff));
+ strBuilder.append(rand.nextInt(0x7fffffff));
+ strBuilder.append(rand.nextInt(0x7fffffff));
+ strBuilder.append(rand.nextInt(0x7fffffff));
+ return strBuilder.toString();
+ }
+
+ private void writePasswordToLocalFile(String localPasswordFile,
+ byte[] password, JobConf conf) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path localPath = new Path(localPasswordFile);
+ if (localFs.isFile(localPath)) {
+ LOG.debug("Password file is already created by previous path");
+ return;
+ }
+ FSDataOutputStream out = FileSystem.create(localFs, localPath,
+ new FsPermission("400"));
+ out.write(password);
+ out.close();
+ }
+
/**
* Get the downward protocol object that can send commands down to the
* application.
@@ -126,7 +185,19 @@ class Application<K1 extends WritableCom
DownwardProtocol<K1, V1> getDownlink() {
return downlink;
}
-
+
+ /**
+ * Wait for authentication response.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void waitForAuthentication() throws IOException,
+ InterruptedException {
+ downlink.flush();
+ LOG.debug("Waiting for authentication response");
+ handler.waitForAuthentication();
+ }
+
/**
* Wait for the application to finish
* @return did the application finish correctly?
@@ -190,5 +261,11 @@ class Application<K1 extends WritableCom
Process result = builder.start();
return result;
}
+
+ public static String createDigest(byte[] password, String data)
+ throws IOException {
+ SecretKey key = JobTokenSecretManager.createSecretKey(password);
+ return SecureShuffleUtils.hashFromString(data, key);
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Fri Mar 4 04:16:53 2011
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
@@ -34,6 +36,8 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.util.StringUtils;
/**
@@ -69,13 +73,15 @@ class BinaryProtocol<K1 extends Writable
REDUCE_VALUE(7),
CLOSE(8),
ABORT(9),
+ AUTHENTICATION_REQ(10),
OUTPUT(50),
PARTITIONED_OUTPUT(51),
STATUS(52),
PROGRESS(53),
DONE(54),
REGISTER_COUNTER(55),
- INCREMENT_COUNTER(56);
+ INCREMENT_COUNTER(56),
+ AUTHENTICATION_RESP(57);
final int code;
MessageType(int code) {
this.code = code;
@@ -90,6 +96,7 @@ class BinaryProtocol<K1 extends Writable
private UpwardProtocol<K2, V2> handler;
private K2 key;
private V2 value;
+ private boolean authPending = true;
public UplinkReaderThread(InputStream stream,
UpwardProtocol<K2, V2> handler,
@@ -113,7 +120,14 @@ class BinaryProtocol<K1 extends Writable
}
int cmd = WritableUtils.readVInt(inStream);
LOG.debug("Handling uplink command " + cmd);
- if (cmd == MessageType.OUTPUT.code) {
+ if (cmd == MessageType.AUTHENTICATION_RESP.code) {
+ String digest = Text.readString(inStream);
+ authPending = !handler.authenticate(digest);
+ } else if (authPending) {
+ LOG.warn("Message " + cmd + " received before authentication is "
+ + "complete. Ignoring");
+ continue;
+ } else if (cmd == MessageType.OUTPUT.code) {
readObject(key);
readObject(value);
handler.output(key, value);
@@ -244,6 +258,15 @@ class BinaryProtocol<K1 extends Writable
uplink.interrupt();
uplink.join();
}
+
+ public void authenticate(String digest, String challenge)
+ throws IOException {
+ LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge="
+ + challenge);
+ WritableUtils.writeVInt(stream, MessageType.AUTHENTICATION_REQ.code);
+ Text.writeString(stream, digest);
+ Text.writeString(stream, challenge);
+ }
public void start() throws IOException {
LOG.debug("starting downlink");
@@ -344,5 +367,4 @@ class BinaryProtocol<K1 extends Writable
stream.write(buffer.getData(), 0, length);
}
}
-
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Fri Mar 4 04:16:53 2011
@@ -32,6 +32,12 @@ import org.apache.hadoop.mapred.JobConf;
*/
interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
/**
+ * request authentication
+ * @throws IOException
+ */
+ void authenticate(String digest, String challenge) throws IOException;
+
+ /**
* Start communication
* @throws IOException
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=1077456&r1=1077455&r2=1077456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Fri Mar 4 04:16:53 2011
@@ -44,21 +44,26 @@ class OutputHandler<K extends WritableCo
private OutputCollector<K, V> collector;
private float progressValue = 0.0f;
private boolean done = false;
+
private Throwable exception = null;
RecordReader<FloatWritable,NullWritable> recordReader = null;
private Map<Integer, Counters.Counter> registeredCounters =
new HashMap<Integer, Counters.Counter>();
+ private String expectedDigest = null;
+ private boolean digestReceived = false;
/**
* Create a handler that will handle any records output from the application.
* @param collector the "real" collector that takes the output
* @param reporter the reporter for reporting progress
*/
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
- RecordReader<FloatWritable,NullWritable> recordReader) {
+ RecordReader<FloatWritable,NullWritable> recordReader,
+ String expectedDigest) {
this.reporter = reporter;
this.collector = collector;
this.recordReader = recordReader;
+ this.expectedDigest = expectedDigest;
}
/**
@@ -155,5 +160,32 @@ class OutputHandler<K extends WritableCo
throw new IOException("Invalid counter with id: " + id);
}
}
+
+ public synchronized boolean authenticate(String digest) throws IOException {
+ boolean success = true;
+ if (!expectedDigest.equals(digest)) {
+ exception = new IOException("Authentication Failed: Expected digest="
+ + expectedDigest + ", received=" + digestReceived);
+ success = false;
+ }
+ digestReceived = true;
+ notify();
+ return success;
+ }
+ /**
+ * This is called by Application and blocks the thread until
+ * authentication response is received.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ synchronized void waitForAuthentication()
+ throws IOException, InterruptedException {
+ while (digestReceived == false && exception == null) {
+ wait();
+ }
+ if (exception != null) {
+ throw new IOException(exception.getMessage());
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?rev=1077456&r1=1077455&r2=1077456&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Fri Mar 4 04:16:53 2011
@@ -88,4 +88,14 @@ interface UpwardProtocol<K extends Writa
* @throws IOException
*/
void incrementCounter(int id, long amount) throws IOException;
+
+ /**
+ * Handles authentication response from client.
+ * It must notify the threads waiting for authentication response.
+ * @param digest
+ * @return true if authentication is successful
+ * @throws IOException
+ */
+ boolean authenticate(String digest) throws IOException;
+
}