You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/23 00:35:55 UTC

svn commit: r966884 [3/10] - in /hadoop/mapreduce/trunk: ./ src/c++/pipes/ src/c++/pipes/impl/ src/c++/utils/ src/c++/utils/m4/ src/examples/pipes/ src/examples/pipes/conf/ src/java/org/apache/hadoop/mapred/pipes/

Modified: hadoop/mapreduce/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/mapreduce/trunk/src/c++/pipes/impl/HadoopPipes.cc Thu Jul 22 22:35:55 2010
@@ -32,6 +32,11 @@
 #include <strings.h>
 #include <sys/socket.h>
 #include <pthread.h>
+#include <iostream>
+#include <fstream>
+
+#include <openssl/hmac.h>
+#include <openssl/buffer.h>
 
 using std::map;
 using std::string;
@@ -290,9 +295,9 @@ namespace HadoopPipes {
 
   enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
                      MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
-                     CLOSE, ABORT, 
+                     CLOSE, ABORT, AUTHENTICATION_REQ,
                      OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
-                     REGISTER_COUNTER, INCREMENT_COUNTER};
+                     REGISTER_COUNTER, INCREMENT_COUNTER, AUTHENTICATION_RESP};
 
   class BinaryUpwardProtocol: public UpwardProtocol {
   private:
@@ -303,6 +308,12 @@ namespace HadoopPipes {
       HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
     }
 
+    virtual void authenticate(const string &responseDigest) {
+      serializeInt(AUTHENTICATION_RESP, *stream);
+      serializeString(responseDigest, *stream);
+      stream->flush();
+    }
+
     virtual void output(const string& key, const string& value) {
       serializeInt(OUTPUT, *stream);
       serializeString(key, *stream);
@@ -359,6 +370,82 @@ namespace HadoopPipes {
     BinaryUpwardProtocol * uplink;
     string key;
     string value;
+    string password;
+    bool authDone;
+    void getPassword(string &password) {
+      const char *passwordFile = getenv("hadoop.pipes.shared.secret.location");
+      if (passwordFile == NULL) {
+        return;
+      }
+      std::ifstream fstr(passwordFile, std::fstream::binary);
+      if (fstr.fail()) {
+        std::cerr << "Could not open the password file" << std::endl;
+        return;
+      } 
+      unsigned char * passBuff = new unsigned char [512];
+      fstr.read((char *)passBuff, 512);
+      int passwordLength = fstr.gcount();
+      fstr.close();
+      passBuff[passwordLength] = 0;
+      password.replace(0, passwordLength, (const char *) passBuff, passwordLength);
+      delete [] passBuff;
+      return; 
+    }
+
+    void verifyDigestAndRespond(string& digest, string& challenge) {
+      if (password.empty()) {
+        //password can be empty if process is running in debug mode from
+        //command file.
+        authDone = true;
+        return;
+      }
+
+      if (!verifyDigest(password, digest, challenge)) {
+        std::cerr << "Server failed to authenticate. Exiting" << std::endl;
+        exit(-1);
+      }
+      authDone = true;
+      string responseDigest = createDigest(password, digest);
+      uplink->authenticate(responseDigest);
+    }
+
+    bool verifyDigest(string &password, string& digest, string& challenge) {
+      string expectedDigest = createDigest(password, challenge);
+      if (digest == expectedDigest) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    string createDigest(string &password, string& msg) {
+      HMAC_CTX ctx;
+      unsigned char digest[EVP_MAX_MD_SIZE];
+      HMAC_Init(&ctx, (const unsigned char *)password.c_str(), 
+          password.length(), EVP_sha1());
+      HMAC_Update(&ctx, (const unsigned char *)msg.c_str(), msg.length());
+      unsigned int digestLen;
+      HMAC_Final(&ctx, digest, &digestLen);
+      HMAC_cleanup(&ctx);
+
+      //now apply base64 encoding
+      BIO *bmem, *b64;
+      BUF_MEM *bptr;
+
+      b64 = BIO_new(BIO_f_base64());
+      bmem = BIO_new(BIO_s_mem());
+      b64 = BIO_push(b64, bmem);
+      BIO_write(b64, digest, digestLen);
+      BIO_flush(b64);
+      BIO_get_mem_ptr(b64, &bptr);
+
+      char digestBuffer[bptr->length];
+      memcpy(digestBuffer, bptr->data, bptr->length-1);
+      digestBuffer[bptr->length-1] = 0;
+      BIO_free_all(b64);
+
+      return string(digestBuffer);
+    }
 
   public:
     BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
@@ -366,6 +453,8 @@ namespace HadoopPipes {
       downStream->open(down);
       uplink = new BinaryUpwardProtocol(up);
       handler = _handler;
+      authDone = false;
+      getPassword(password);
     }
 
     UpwardProtocol* getUplink() {
@@ -375,7 +464,22 @@ namespace HadoopPipes {
     virtual void nextEvent() {
       int32_t cmd;
       cmd = deserializeInt(*downStream);
+      if (!authDone && cmd != AUTHENTICATION_REQ) {
+        //Authentication request must be the first message if
+        //authentication is not complete
+        std::cerr << "Command:" << cmd << "received before authentication. " 
+            << "Exiting.." << std::endl;
+        exit(-1);
+      }
       switch (cmd) {
+      case AUTHENTICATION_REQ: {
+        string digest;
+        string challenge;
+        deserializeString(digest, *downStream);
+        deserializeString(challenge, *downStream);
+        verifyDigestAndRespond(digest, challenge);
+        break;
+      }
       case START_MESSAGE: {
         int32_t prot;
         prot = deserializeInt(*downStream);
@@ -1022,7 +1126,6 @@ namespace HadoopPipes {
         setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
                                      + strerror(errno));
-
         connection = new BinaryProtocol(stream, context, outStream);
       } else if (getenv("mapreduce.pipes.commandfile")) {
         char* filename = getenv("mapreduce.pipes.commandfile");

Modified: hadoop/mapreduce/trunk/src/c++/utils/Makefile.in
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/utils/Makefile.in?rev=966884&r1=966883&r2=966884&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/utils/Makefile.in (original)
+++ hadoop/mapreduce/trunk/src/c++/utils/Makefile.in Thu Jul 22 22:35:55 2010
@@ -1,4 +1,4 @@
-# Makefile.in generated by automake 1.9 from Makefile.am.
+# Makefile.in generated by automake 1.9.2 from Makefile.am.
 # @configure_input@
 
 # Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
@@ -37,13 +37,12 @@ POST_INSTALL = :
 NORMAL_UNINSTALL = :
 PRE_UNINSTALL = :
 POST_UNINSTALL = :
+build_triplet = @build@
 host_triplet = @host@
-DIST_COMMON = config.guess config.guess config.sub config.sub \
-	$(srcdir)/Makefile.in $(srcdir)/Makefile.am \
-	$(top_srcdir)/configure $(am__configure_deps) \
-	$(top_srcdir)/impl/config.h.in depcomp depcomp ltmain.sh \
-	ltmain.sh config.guess config.guess config.sub config.sub \
-	$(api_HEADERS)
+DIST_COMMON = config.guess config.sub $(srcdir)/Makefile.in \
+	$(srcdir)/Makefile.am $(top_srcdir)/configure \
+	$(am__configure_deps) $(top_srcdir)/impl/config.h.in depcomp \
+	ltmain.sh config.guess config.sub $(api_HEADERS)
 subdir = .
 ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
 am__aclocal_m4_deps = $(top_srcdir)/m4/hadoop_utils.m4 \
@@ -116,6 +115,7 @@ EGREP = @EGREP@
 EXEEXT = @EXEEXT@
 F77 = @F77@
 FFLAGS = @FFLAGS@
+GREP = @GREP@
 INSTALL_DATA = @INSTALL_DATA@
 INSTALL_PROGRAM = @INSTALL_PROGRAM@
 INSTALL_SCRIPT = @INSTALL_SCRIPT@
@@ -140,12 +140,9 @@ SET_MAKE = @SET_MAKE@
 SHELL = @SHELL@
 STRIP = @STRIP@
 VERSION = @VERSION@
-ac_ct_AR = @ac_ct_AR@
 ac_ct_CC = @ac_ct_CC@
 ac_ct_CXX = @ac_ct_CXX@
 ac_ct_F77 = @ac_ct_F77@
-ac_ct_RANLIB = @ac_ct_RANLIB@
-ac_ct_STRIP = @ac_ct_STRIP@
 am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
 am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
 am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
@@ -162,23 +159,30 @@ build_cpu = @build_cpu@
 build_os = @build_os@
 build_vendor = @build_vendor@
 datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
 exec_prefix = @exec_prefix@
 host = @host@
 host_alias = @host_alias@
 host_cpu = @host_cpu@
 host_os = @host_os@
 host_vendor = @host_vendor@
+htmldir = @htmldir@
 includedir = @includedir@
 infodir = @infodir@
 install_sh = @install_sh@
 libdir = @libdir@
 libexecdir = @libexecdir@
+localedir = @localedir@
 localstatedir = @localstatedir@
 mandir = @mandir@
 mkdir_p = @mkdir_p@
 oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
 prefix = @prefix@
 program_transform_name = @program_transform_name@
+psdir = @psdir@
 sbindir = @sbindir@
 sharedstatedir = @sharedstatedir@
 sysconfdir = @sysconfdir@