You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/10/10 20:35:32 UTC

svn commit: r1396722 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/netty/ src/main/java/org/apache/giraph/comm/netty/handler/ src/main/java/org/apache/giraph/com...

Author: ekoontz
Date: Wed Oct 10 18:35:31 2012
New Revision: 1396722

URL: http://svn.apache.org/viewvc?rev=1396722&view=rev
Log:
GIRAPH-211: Add secure authentication to Netty IPC

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Modified:
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Oct 10 18:35:31 2012
@@ -534,8 +534,13 @@ under the License.
           <version>${hadoop.version}</version>
           <scope>provided</scope>
         </dependency>
+	<dependency>
+	  <groupId>commons-net</groupId>
+	  <artifactId>commons-net</artifactId>
+	  <scope>provided</scope>
+	  <version>3.1</version>
+	</dependency>
       </dependencies>
-
       <build>
         <plugins>
           <plugin>
@@ -553,7 +558,7 @@ under the License.
             </executions>
 	    <!-- profile: hadoop_0.20.203 -->
             <configuration>
-              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -578,6 +583,12 @@ under the License.
           <version>${hadoop.version}</version>
           <scope>provided</scope>
         </dependency>
+	<dependency>
+	  <groupId>commons-net</groupId>
+	  <artifactId>commons-net</artifactId>
+	  <scope>provided</scope>
+	  <version>3.1</version>
+	</dependency>
       </dependencies>
 
       <build>
@@ -597,7 +608,7 @@ under the License.
             </executions>
 	    <!-- profile: hadoop_1.0 -->
             <configuration>
-              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -647,6 +658,16 @@ under the License.
             <configuration>
 	      <!-- profile: hadoop_non_secure -->
               <symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+	      <excludes>**/comm/SecureRPCCommunications.java,
+                        **/comm/netty/SaslNettyClient.java,
+                        **/comm/netty/SaslNettyServer.java,
+                        **/comm/netty/handler/AuthorizeServerHandler.java,
+                        **/comm/netty/handler/SaslClientHandler.java,
+                        **/comm/netty/handler/SaslServerHandler.java,
+                        **/comm/requests/SaslCompleteRequest.java,
+                        **/comm/requests/SaslTokenMessageRequest.java,
+                        **/comm/SaslConnectionTest.java,
+                        **/hadoop/BspTokenSelector.java</excludes>
             </configuration>
           </plugin>
           <plugin>
@@ -654,10 +675,6 @@ under the License.
             <artifactId>maven-compiler-plugin</artifactId>
             <version>${maven-compiler-plugin.version}</version>
             <configuration>
-              <excludes>
-                <exclude>**/BspTokenSelector.java</exclude>
-                <exclude>**/SecureRPCCommunications.java</exclude>
-              </excludes>
               <source>${compileSource}</source>
               <target>${compileSource}</target>
             </configuration>
@@ -706,7 +723,10 @@ under the License.
             <excludes>
               <exclude>BspTokenSelector.java</exclude>
 	      <exclude>SecureRPCCommunications.java</exclude>
-            </excludes>
+	    </excludes>
+	  </resource>
+	  <resource>
+            <directory>${basedir}/src/main/java/org/apache/giraph/comm/netty</directory>
           </resource>
         </resources>
         <plugins>
@@ -739,6 +759,16 @@ under the License.
 	    <!-- profile: hadoop_facebook -->
             <configuration>
               <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+	      <excludes>**/comm/SecureRPCCommunications.java,
+                        **/comm/netty/SaslNettyClient.java,
+                        **/comm/netty/SaslNettyServer.java,
+                        **/comm/netty/handler/AuthorizeServerHandler.java,
+                        **/comm/netty/handler/SaslClientHandler.java,
+                        **/comm/netty/handler/SaslServerHandler.java,
+                        **/comm/requests/SaslCompleteRequest.java,
+                        **/comm/requests/SaslTokenMessageRequest.java,
+                        **/comm/SaslConnectionTest.java,
+                        **/hadoop/BspTokenSelector.java</excludes>
             </configuration>
           </plugin>
           <plugin>
@@ -826,6 +856,69 @@ under the License.
     </profile>
 
     <profile>
+      <id>hadoop_2.0.1</id>
+       <activation>
+        <property>
+          <name>hadoop</name>
+          <value>2.0.1</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>2.0.1-alpha</hadoop.version>
+      </properties>
+      <dependencies>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-common</artifactId>
+	  <version>${hadoop.version}</version>
+	  <scope>provided</scope>
+	</dependency>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-mapreduce-client-core</artifactId>
+	  <version>${hadoop.version}</version>
+	  <scope>provided</scope>
+	</dependency>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-mapreduce-client-common</artifactId>
+	  <version>${hadoop.version}</version>
+	</dependency>
+      </dependencies>
+    </profile>
+
+    <profile>
+      <id>hadoop_2.0.2</id>
+       <activation>
+        <property>
+          <name>hadoop</name>
+          <value>2.0.2</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>2.0.2-alpha</hadoop.version>
+      </properties>
+      <dependencies>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-common</artifactId>
+	  <version>${hadoop.version}</version>
+	  <scope>provided</scope>
+	</dependency>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-mapreduce-client-core</artifactId>
+	  <version>${hadoop.version}</version>
+	  <scope>provided</scope>
+	</dependency>
+	<dependency>
+	  <groupId>org.apache.hadoop</groupId>
+	  <artifactId>hadoop-mapreduce-client-common</artifactId>
+	  <version>${hadoop.version}</version>
+	</dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>hadoop_trunk</id>
        <activation>
         <property>

Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Oct 10 18:35:31 2012
@@ -563,6 +563,14 @@ public class GiraphConfiguration extends
   public static final String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
 
   /**
+   * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
+   * and authorize Netty BSP Clients to Servers.
+   */
+  public static final String AUTHENTICATE = "giraph.authenticate";
+  /** Default is not to do authenticate and authorization with Netty. */
+  public static final boolean DEFAULT_AUTHENTICATE = false;
+
+  /**
    * Constructor that creates the configuration
    */
   public GiraphConfiguration() { }
@@ -841,4 +849,13 @@ public class GiraphConfiguration extends
     }
     return mapTasks;
   }
+
+  /**
+   * Use authentication? (if supported)
+   *
+   * @return True if should authenticate, false otherwise
+   */
+  public boolean authenticate() {
+    return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Oct 10 18:35:31 2012
@@ -580,8 +580,18 @@ public abstract class BasicRPCCommunicat
     return myAddress.getPort();
   }
 
+  /**
+   * Connect all BSP workers with each other.
+   * @param authenticate ignored if using Hadoop RPC, since authentication is
+   * handled at the level of Hadoop RPC. Only when Netty is used is this
+   * significant.
+   */
   @Override
+/*if[HADOOP_NON_SECURE]
   public void setup() {
+else[HADOOP_NON_SECURE]*/
+  public void setup(boolean authenticate) {
+/*end[HADOOP_NON_SECURE]*/
     try {
       connectAllRPCProxys(this.jobId, this.jobToken);
     } catch (IOException e) {
@@ -1297,6 +1307,15 @@ public abstract class BasicRPCCommunicat
     }
   }
 
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void authenticate() {
+    // nothing done here if using Hadoop RPC: authentication is
+    // handled at Hadoop library-level
+  }
+/*end[HADOOP_NON_SECURE]*/
+
   @Override
   public String getName() {
     return myName;

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Wed Oct 10 18:35:31 2012
@@ -19,11 +19,6 @@
 package org.apache.giraph.comm;
 
 import org.apache.giraph.graph.Vertex;
-/*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.hadoop.BspTokenSelector;
-import org.apache.hadoop.security.token.TokenInfo;
-/*end[HADOOP_NON_SECURE]*/
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.VersionedProtocol;
@@ -40,10 +35,6 @@ import java.io.IOException;
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-/*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
-@TokenInfo(BspTokenSelector.class)
-/*end[HADOOP_NON_SECURE]*/
 public interface CommunicationsInterface<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends VersionedProtocol {

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java Wed Oct 10 18:35:31 2012
@@ -46,7 +46,6 @@ import org.apache.giraph.bsp.Centralized
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.hadoop.BspPolicyProvider;
 
-
 /*if[HADOOP_NON_INTERVERSIONED_RPC]
 else[HADOOP_NON_INTERVERSIONED_RPC]*/
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -148,8 +147,7 @@ public class SecureRPCCommunications<I e
       }
     }
 
-    // TODO: make munge tag more specific: just use HADOOP_1 maybe.
-    /*if[HADOOP_1_AUTHORIZATION]
+/*if[HADOOP_1_SECURITY]
     // Hadoop 1-style authorization.
     Server server = RPC.getServer(this,
       myAddress.getHostName(), myAddress.getPort(),
@@ -160,7 +158,7 @@ public class SecureRPCCommunications<I e
     if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
       ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
     }
-    else[HADOOP_1_AUTHORIZATION]*/
+else[HADOOP_1_SECURITY]*/
     // Hadoop 2+-style authorization.
     Server server = RPC.getServer(this,
       myAddress.getHostName(), myAddress.getPort(),
@@ -172,7 +170,7 @@ public class SecureRPCCommunications<I e
       ServiceAuthorizationManager sam = new ServiceAuthorizationManager();
       sam.refresh(conf, new BspPolicyProvider());
     }
-    /*end[HADOOP_1_AUTHORIZATION]*/
+/*end[HADOOP_1_SECURITY]*/
     return server;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 10 18:35:31 2012
@@ -39,10 +39,21 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public interface WorkerClient<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
+
   /**
    *  Setup the client.
    */
+/*if[HADOOP_NON_SECURE]
   void setup();
+else[HADOOP_NON_SECURE]*/
+  /**
+   * Setup the client.
+   *
+   * @param authenticate whether to SASL authenticate with server or not:
+   * set by giraph.authenticate configuration option.
+   */
+  void setup(boolean authenticate);
+/*end[HADOOP_NON_SECURE]*/
 
   /**
    * Fix changes to the workers and the mapping between partitions and
@@ -123,4 +134,14 @@ public interface WorkerClient<I extends 
    * @throws IOException
    */
   void closeConnections() throws IOException;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /**
+   * Authenticates, as client, with another BSP worker, as server.
+   *
+   * @throws IOException
+   */
+  void authenticate() throws IOException;
+/*end[HADOOP_NON_SECURE]*/
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 10 18:35:31 2012
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +40,12 @@ import org.apache.giraph.comm.netty.hand
 import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
 import org.apache.giraph.comm.netty.handler.RequestInfo;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslClientHandler;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+/*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -47,13 +55,15 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelConfig;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.execution.ExecutionHandler;
 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
-
 import static org.jboss.netty.channel.Channels.pipeline;
 
 /**
@@ -74,6 +84,12 @@ public class NettyClient {
   public static final int MAX_REQUESTS_TO_LIST = 10;
   /** 30 seconds to connect by default */
   public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /** Used to authenticate with other workers acting as servers */
+  public static final ChannelLocal<SaslNettyClient> SASL =
+      new ChannelLocal<SaslNettyClient>();
+/*end[HADOOP_NON_SECURE]*/
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
   /** Context used to report progress */
@@ -234,31 +250,62 @@ public class NettyClient {
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = pipeline();
-        pipeline.addLast("clientByteCounter", byteCounter);
-        pipeline.addLast("responseFrameDecoder",
-            new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
-        pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
-            GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
-            GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
-        pipeline.addLast("responseClientHandler",
-            new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
-
-        if (executionHandler != null) {
-          pipeline.addAfter(handlerBeforeExecutionHandler,
-              "executionHandler", executionHandler);
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+        if (conf.authenticate()) {
+          LOG.info("Using Netty with authentication.");
+
+          // Our pipeline starts with just byteCounter, and then we use
+          // addLast() to incrementally add pipeline elements, so that we can
+          // name them for identification for removal or replacement after
+          // client is authenticated by server.
+          // After authentication is complete, the pipeline's SASL-specific
+          // functionality is removed, restoring the pipeline to exactly the
+          // same configuration as it would be without authentication.
+          ChannelPipeline pipeline = Channels.pipeline(
+              byteCounter);
+          // The following pipeline component is needed to decode the server's
+          // SASL tokens. It is replaced with a FixedLengthFrameDecoder (same
+          // as used with the non-authenticated pipeline) after authentication
+          // completes (as in non-auth pipeline below).
+          pipeline.addLast("length-field-based-frame-decoder",
+              new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
+          pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt(
+              GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+              GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          // The following pipeline component responds to the server's SASL
+          // tokens with its own responses. Both client and server share the
+          // same Hadoop Job token, which is used to create the SASL tokens to
+          // authenticate with each other.
+          // After authentication finishes, this pipeline component is removed.
+          pipeline.addLast("sasl-client-handler",
+              new SaslClientHandler(conf));
+          pipeline.addLast("response-handler",
+              new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+          return pipeline;
+        } else {
+          LOG.info("Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+          ChannelPipeline pipeline = pipeline();
+          pipeline.addLast("clientByteCounter", byteCounter);
+          pipeline.addLast("responseFrameDecoder",
+              new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+          pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
+              GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+              GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          pipeline.addLast("responseClientHandler",
+              new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+          if (executionHandler != null) {
+            pipeline.addAfter(handlerBeforeExecutionHandler,
+                "executionHandler", executionHandler);
+          }
+          return pipeline;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
         }
-        return pipeline;
+/*end[HADOOP_NON_SECURE]*/
       }
     });
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("NettyClient: Started client" +
-          " with up to " + maxPoolSize + " threads" +
-          " with sendBufferSize = " + sendBufferSize +
-          " receiveBufferSize = " + receiveBufferSize +
-          " maxRequestMilliseconds = " + maxRequestMilliseconds);
-    }
   }
 
   /**
@@ -379,6 +426,86 @@ public class NettyClient {
     }
   }
 
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /**
+   * Authenticate all servers in addressChannelMap.
+   */
+  public void authenticate() {
+    LOG.info("authenticate: NettyClient starting authentication with " +
+        "servers.");
+    for (InetSocketAddress address: addressChannelMap.keySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("authenticate: Authenticating with address:" + address);
+      }
+      ChannelRotater channelRotater = addressChannelMap.get(address);
+      for (Channel channel: channelRotater.getChannels()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticate: Authenticating with server on channel: " +
+              channel);
+        }
+        authenticateOnChannel(channelRotater.getTaskId(), channel);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("authenticate: NettyClient successfully authenticated with " +
+          addressChannelMap.size() + " server" +
+          ((addressChannelMap.size() != 1) ? "s" : "") +
+          " - continuing with normal work.");
+    }
+  }
+
+  /**
+   * Authenticate with server connected at given channel.
+   *
+   * @param taskId Task id of the channel
+   * @param channel Connection to server to authenticate with.
+   */
+  private void authenticateOnChannel(Integer taskId, Channel channel) {
+    try {
+      SaslNettyClient saslNettyClient = SASL.get(channel);
+      if (SASL.get(channel) == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
+              "for channel: " + channel);
+        }
+        saslNettyClient = new SaslNettyClient();
+        SASL.set(channel, saslNettyClient);
+      }
+      if (!saslNettyClient.isComplete()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticateOnChannel: Waiting for authentication " +
+              "to complete..");
+        }
+        SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
+        sendWritableRequest(taskId, (InetSocketAddress) channel
+            .getRemoteAddress(),
+          saslTokenMessage);
+        // We now wait for Netty's thread pool to communicate over this
+        // channel to authenticate with another worker acting as a server.
+        try {
+          synchronized (saslNettyClient.getAuthenticated()) {
+            while (!saslNettyClient.isComplete()) {
+              saslNettyClient.getAuthenticated().wait();
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.error("authenticateOnChannel: Interrupted while waiting for " +
+              "authentication.");
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("authenticateOnChannel: Authentication on channel: " +
+            channel + " has completed successfully.");
+      }
+    } catch (IOException e) {
+      LOG.error("authenticateOnChannel: Failed to authenticate with server " +
+          "due to error: " + e);
+    }
+    return;
+  }
+/*end[HADOOP_NON_SECURE]*/
+
   /**
    * Stop the client.
    */
@@ -436,7 +563,7 @@ public class NettyClient {
     // Get rid of the failed channel
     addressChannelMap.get(remoteServer).removeLast();
     if (LOG.isInfoEnabled()) {
-      LOG.info("checkAndFixChannel: Fixing disconnected channel to " +
+      LOG.info("getNextChannel: Fixing disconnected channel to " +
           remoteServer + ", open = " + channel.isOpen() + ", " +
           "bound = " + channel.isBound());
     }
@@ -446,24 +573,24 @@ public class NettyClient {
       connectionFuture.awaitUninterruptibly();
       if (connectionFuture.isSuccess()) {
         if (LOG.isInfoEnabled()) {
-          LOG.info("checkAndFixChannel: Connected to " + remoteServer + "!");
+          LOG.info("getNextChannel: Connected to " + remoteServer + "!");
         }
         addressChannelMap.get(remoteServer).addChannel(
             connectionFuture.getChannel());
         return connectionFuture.getChannel();
       }
       ++reconnectFailures;
-      LOG.warn("checkAndFixChannel: Failed to reconnect to " +  remoteServer +
+      LOG.warn("getNextChannel: Failed to reconnect to " +  remoteServer +
           " on attempt " + reconnectFailures + " out of " +
           maxConnectionFailures + " max attempts, sleeping for 5 secs",
           connectionFuture.getCause());
       try {
         Thread.sleep(5000);
       } catch (InterruptedException e) {
-        LOG.warn("blockingConnect: Unexpected interrupted exception", e);
+        LOG.warn("getNextChannel: Unexpected interrupted exception", e);
       }
     }
-    throw new IllegalStateException("checkAndFixChannel: Failed to connect " +
+    throw new IllegalStateException("getNextChannel: Failed to connect " +
         "to " + remoteServer + " in " + reconnectFailures +
         " connect attempts");
   }
@@ -481,22 +608,30 @@ public class NettyClient {
     if (clientRequestIdRequestInfoMap.isEmpty()) {
       byteCounter.resetAll();
     }
+    boolean registerRequest = true;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+    if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
+      registerRequest = false;
+    }
+/*end[HADOOP_NON_SECURE]*/
 
     Channel channel = getNextChannel(remoteServer);
-    request.setClientId(clientId);
-    request.setRequestId(
-        addressRequestIdGenerator.getNextRequestId(remoteServer));
-
     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
-    RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
-        new ClientRequestId(destWorkerId, request.getRequestId()),
-            newRequestInfo);
-    if (oldRequestInfo != null) {
-      throw new IllegalStateException("sendWritableRequest: Impossible to " +
+    if (registerRequest) {
+      request.setClientId(clientId);
+      request.setRequestId(
+        addressRequestIdGenerator.getNextRequestId(remoteServer));
+      ClientRequestId clientRequestId =
+        new ClientRequestId(destWorkerId, request.getRequestId());
+      RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+        clientRequestId, newRequestInfo);
+      if (oldRequestInfo != null) {
+        throw new IllegalStateException("sendWritableRequest: Impossible to " +
           "have a previous request id = " + request.getRequestId() + ", " +
           "request info of " + oldRequestInfo);
+      }
     }
-
     ChannelFuture writeFuture = channel.write(request);
     newRequestInfo.setWriteFuture(writeFuture);
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Oct 10 18:35:31 2012
@@ -24,18 +24,29 @@ import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
+/*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
 import org.apache.giraph.comm.netty.handler.RequestDecoder;
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseEncoder;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+/*end[HADOOP_NON_SECURE]*/
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -46,12 +57,23 @@ import org.jboss.netty.handler.execution
 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
 
 import static org.jboss.netty.channel.Channels.pipeline;
+
 /**
  * This server uses Netty and will implement all Giraph communication
  */
 public class NettyServer {
   /** Default maximum thread pool size */
   public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
+
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /** Used to authenticate with netty clients */
+  public static final ChannelLocal<SaslNettyServer>
+  CHANNEL_SASL_NETTY_SERVERS =
+    new ChannelLocal<SaslNettyServer>();
+/*end[HADOOP_NON_SECURE]*/
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
@@ -70,6 +92,11 @@ public class NettyServer {
   private final int tcpBacklog;
   /** Factory for {@link RequestServerHandler} */
   private final RequestServerHandler.Factory requestServerHandlerFactory;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /** Factory for {@link RequestServerHandler} */
+  private SaslServerHandler.Factory saslServerHandlerFactory;
+/*end[HADOOP_NON_SECURE]*/
   /** Server bootstrap */
   private ServerBootstrap bootstrap;
   /** Byte counter for this client */
@@ -99,7 +126,10 @@ public class NettyServer {
       RequestServerHandler.Factory requestServerHandlerFactory) {
     this.conf = conf;
     this.requestServerHandlerFactory = requestServerHandlerFactory;
-
+    /*if[HADOOP_NON_SECURE]
+    else[HADOOP_NON_SECURE]*/
+    this.saslServerHandlerFactory = new SaslServerHandler.Factory();
+    /*end[HADOOP_NON_SECURE]*/
     sendBufferSize = conf.getInt(
         GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
         GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
@@ -156,6 +186,23 @@ public class NettyServer {
     }
   }
 
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  /**
+   * Constructor for creating the server
+   *
+   * @param conf Configuration to use
+   * @param requestServerHandlerFactory Factory for request handlers
+   * @param saslServerHandlerFactory  Factory for SASL handlers
+   */
+  public NettyServer(ImmutableClassesGiraphConfiguration conf,
+                     RequestServerHandler.Factory requestServerHandlerFactory,
+                     SaslServerHandler.Factory saslServerHandlerFactory) {
+    this(conf, requestServerHandlerFactory);
+    this.saslServerHandlerFactory = saslServerHandlerFactory;
+  }
+/*end[HADOOP_NON_SECURE]*/
+
   /**
    * Start the server with the appropriate port
    */
@@ -173,31 +220,62 @@ public class NettyServer {
             receiveBufferSize,
             receiveBufferSize));
 
+    /**
+     * Pipeline setup: depends on whether configured to use authentication
+     * or not.
+     */
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = pipeline();
-
-        pipeline.addLast("serverByteCounter", byteCounter);
-        pipeline.addLast("requestFrameDecoder",
-            new LengthFieldBasedFrameDecoder(
-                1024 * 1024 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("requestDecoder",
-            new RequestDecoder(conf, byteCounter));
-        pipeline.addLast("requestProcessor",
-            requestServerHandlerFactory.newHandler(
-                workerRequestReservedMap, conf));
-        if (executionHandler != null) {
-          pipeline.addAfter(handlerBeforeExecutionHandler,
-              "executionHandler", executionHandler);
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+        if (conf.authenticate()) {
+          LOG.info("start: Will use Netty pipeline with " +
+              "authentication and authorization of clients.");
+          // After a client authenticates, the two authentication-specific
+          // pipeline components SaslServerHandler and ResponseEncoder are
+          // removed, leaving the pipeline the same as in the non-authenticated
+          // configuration except for the presence of the Authorize component.
+          return Channels.pipeline(
+              byteCounter,
+              new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
+              new RequestDecoder(conf, byteCounter),
+              // Removed after authentication completes:
+              saslServerHandlerFactory.newHandler(conf),
+              new AuthorizeServerHandler(),
+              requestServerHandlerFactory.newHandler(workerRequestReservedMap,
+                  conf),
+              // Removed after authentication completes:
+              new ResponseEncoder());
+        } else {
+          LOG.info("start: Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+          ChannelPipeline pipeline = pipeline();
+
+          pipeline.addLast("serverByteCounter", byteCounter);
+          pipeline.addLast("requestFrameDecoder",
+              new LengthFieldBasedFrameDecoder(
+                  1024 * 1024 * 1024, 0, 4, 0, 4));
+          pipeline.addLast("requestDecoder",
+              new RequestDecoder(conf, byteCounter));
+          pipeline.addLast("requestProcessor",
+              requestServerHandlerFactory.newHandler(
+                  workerRequestReservedMap, conf));
+          if (executionHandler != null) {
+            pipeline.addAfter(handlerBeforeExecutionHandler,
+                "executionHandler", executionHandler);
+          }
+          return pipeline;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
         }
-        return pipeline;
+/*end[HADOOP_NON_SECURE]*/
       }
     });
 
     int taskId = conf.getTaskPartition();
     int numTasks = conf.getInt("mapred.map.tasks", 1);
-    // number of workers + 1 for master
+    // Number of workers + 1 for master
     int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
     int portIncrementConstant =
         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
@@ -220,7 +298,7 @@ public class NettyServer {
       this.myAddress = new InetSocketAddress(localHostname, bindPort);
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
         if (LOG.isInfoEnabled()) {
-          LOG.info("NettyServer: Intentionally fail first " +
+          LOG.info("start: Intentionally fail first " +
               "binding attempt as giraph.failFirstRpcPortBindAttempt " +
               "is true, port " + bindPort);
         }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 10 18:35:31 2012
@@ -69,7 +69,7 @@ public class NettyWorkerClient<I extends
   /** Class logger */
   private static final Logger LOG =
     Logger.getLogger(NettyWorkerClient.class);
-  /** signal for getInetSocketAddress() to use WorkerInfo's address */
+  /** Signal for getInetSocketAddress() to use WorkerInfo's address */
   private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
@@ -470,8 +470,26 @@ public class NettyWorkerClient<I extends
     nettyClient.stop();
   }
 
+/*if[HADOOP_NON_SECURE]
   @Override
   public void setup() {
     fixPartitionIdToSocketAddrMap();
   }
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void setup(boolean authenticate) {
+    fixPartitionIdToSocketAddrMap();
+    if (authenticate) {
+      authenticate();
+    }
+  }
+/*end[HADOOP_NON_SECURE]*/
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void authenticate() {
+    nettyClient.authenticate();
+  }
+/*end[HADOOP_NON_SECURE]*/
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Wed Oct 10 18:35:31 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 
@@ -46,6 +47,9 @@ import java.io.IOException;
 public class NettyWorkerClientServer<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements WorkerClientServer<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(NettyWorkerClientServer.class);
   /** Client that sends requests */
   private final WorkerClient<I, V, E, M> client;
   /** Server that processes requests */
@@ -65,8 +69,7 @@ public class NettyWorkerClientServer<I e
     server = new NettyWorkerServer<I, V, E, M>(
         configuration, service, context);
     client = new NettyWorkerClient<I, V, E, M>(context,
-        configuration, service,
-       ((NettyWorkerServer<I, V, E, M>) server).getServerData());
+        configuration, service, server.getServerData());
   }
 
   @Override
@@ -123,9 +126,30 @@ public class NettyWorkerClientServer<I e
   }
 
   @Override
+/*if[HADOOP_NON_SECURE]
   public void setup() {
     client.fixPartitionIdToSocketAddrMap();
   }
+else[HADOOP_NON_SECURE]*/
+  public void setup(boolean authenticateWithServer) {
+    client.fixPartitionIdToSocketAddrMap();
+    if (authenticateWithServer) {
+      try {
+        client.authenticate();
+      } catch (IOException e) {
+        LOG.error("setup: Failed to authenticate : " + e);
+      }
+    }
+  }
+/*end[HADOOP_NON_SECURE]*/
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void authenticate() throws IOException {
+    client.authenticate();
+  }
+/*end[HADOOP_NON_SECURE]*/
 
   @Override
   public void prepareSuperstep() {

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+
+/**
+ * Implements SASL logic for Giraph BSP worker clients.
+ */
+public class SaslNettyClient {
+  /** Class logger */
+  public static final Logger LOG = Logger.getLogger(SaslNettyClient.class);
+
+  /**
+   * Used to synchronize client requests: client's work-related requests must
+   * wait until SASL authentication completes.
+   */
+  private Object authenticated = new Object();
+
+  /**
+   * Used to respond to server's counterpart, SaslServer with SASL tokens
+   * represented as byte arrays.
+   */
+  private SaslClient saslClient;
+
+  /**
+   * Create a SaslNettyClient for authentication with BSP servers.
+   */
+  public SaslNettyClient() {
+    try {
+      Token<? extends TokenIdentifier> token =
+          createJobToken(new Configuration());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SaslNettyClient: Creating SASL " +
+            AuthMethod.DIGEST.getMechanismName() +
+            " client to authenticate to service at " + token.getService());
+      }
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+          .getMechanismName() }, null, null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+    } catch (IOException e) {
+      LOG.error("SaslNettyClient: Could not obtain job token for Netty " +
+          "Client to use to authenticate with a Netty Server.");
+      saslClient = null;
+    }
+  }
+
+  public Object getAuthenticated() {
+    return authenticated;
+  }
+
+  /**
+   * Obtain JobToken, which we'll use as a credential for SASL authentication
+   * when connecting to other Giraph BSPWorkers.
+   *
+   * @param conf Configuration
+   * @return a JobToken containing username and password so that client can
+   * authenticate with a server.
+   */
+  private Token<JobTokenIdentifier> createJobToken(Configuration conf)
+    throws IOException {
+    String localJobTokenFile = System.getenv().get(
+        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (localJobTokenFile != null) {
+      JobConf jobConf = new JobConf(conf);
+      Credentials credentials =
+          TokenCache.loadTokens(localJobTokenFile, jobConf);
+      return TokenCache.getJobToken(credentials);
+    } else {
+      throw new IOException("createJobToken: Cannot obtain authentication " +
+          "credentials for job: file: '" +
+          UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION + "' not found");
+    }
+  }
+
+  /**
+   * Used by authenticateOnChannel() to initiate SASL handshake with server.
+   * @return SaslTokenMessageRequest message to be sent to server.
+   * @throws IOException
+   */
+  public SaslTokenMessageRequest firstToken()
+    throws IOException {
+    byte[] saslToken = new byte[0];
+    if (saslClient.hasInitialResponse()) {
+      saslToken = saslClient.evaluateChallenge(saslToken);
+    }
+    SaslTokenMessageRequest saslTokenMessage =
+        new SaslTokenMessageRequest();
+    saslTokenMessage.setSaslToken(saslToken);
+    return saslTokenMessage;
+  }
+
+  public boolean isComplete() {
+    return saslClient.isComplete();
+  }
+
+  /**
+   * Respond to server's SASL token.
+   * @param saslTokenMessage contains server's SASL token
+   * @return client's response SASL token
+   */
+  public byte[] saslResponse(SaslTokenMessageRequest saslTokenMessage) {
+    try {
+      byte[] retval =
+          saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
+      return retval;
+    } catch (SaslException e) {
+      LOG.error("saslResponse: Failed to respond to SASL server's token:", e);
+      return null;
+    }
+  }
+
+  /**
+   * Implementation of javax.security.auth.callback.CallbackHandler
+   * that works with Hadoop JobTokens.
+   */
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    /** Generated username contained in JobToken */
+    private final String userName;
+    /** Generated password contained in JobToken */
+    private final char[] userPassword;
+
+    /**
+     * Set private members using token.
+     * @param token Hadoop JobToken.
+     */
+    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = SaslNettyServer.encodeIdentifier(token.getIdentifier());
+      this.userPassword = SaslNettyServer.encodePassword(token.getPassword());
+    }
+
+    /**
+     * Implementation used to respond to SASL tokens from server.
+     *
+     * @param callbacks objects that indicate what credential information the
+     *                  server's SaslServer requires from the client.
+     * @throws UnsupportedCallbackException
+     */
+    public void handle(Callback[] callbacks)
+      throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "handle: Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting username: " +
+              userName);
+        }
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting userPassword");
+        }
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting realm: " +
+              rc.getDefaultText());
+        }
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.netty;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.classification.InterfaceStability;
+/*if[HADOOP_1_SECURITY]
+else[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.ipc.StandbyException;
+/*end[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+
+/**
+ * Encapsulates SASL server logic for Giraph BSP worker servers.
+ */
+public class SaslNettyServer extends SaslRpcServer {
+  /** Logger */
+  public static final Logger LOG = Logger.getLogger(SaslNettyServer.class);
+
+  /**
+   * Actual SASL work done by this object from javax.security.sasl.
+   * Initialized below in constructor.
+   */
+  private SaslServer saslServer;
+
+  /**
+   * Constructor
+   *
+   * @param secretManager supplied by SaslServerHandler.
+   */
+  public SaslNettyServer(JobTokenSecretManager secretManager) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SaslNettyServer: Secret manager is: " + secretManager);
+    }
+/*if[HADOOP_1_SECURITY]
+else[HADOOP_1_SECURITY]*/
+    try {
+      secretManager.checkAvailableForRead();
+    } catch (StandbyException e) {
+      LOG.error("SaslNettyServer: Could not read secret manager: " + e);
+    }
+/*end[HADOOP_1_SECURITY]*/
+
+    try {
+      SaslDigestCallbackHandler ch =
+          new SaslNettyServer.SaslDigestCallbackHandler(secretManager);
+      saslServer = Sasl.createSaslServer(SaslNettyServer.AuthMethod.DIGEST
+          .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, ch);
+    } catch (SaslException e) {
+      LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
+    }
+  }
+
+  public boolean isComplete() {
+    return saslServer.isComplete();
+  }
+
+  public String getUserName() {
+    return saslServer.getAuthorizationID();
+  }
+
+  /**
+   * Used by SaslTokenMessage::processToken() to respond to server SASL tokens.
+   *
+   * @param token Server's SASL token
+   * @return token to send back to the server.
+   */
+  public byte[] response(byte[] token) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("response: Responding to input token of length: " +
+            token.length);
+      }
+      byte[] retval = saslServer.evaluateResponse(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("response: Response token length: " + retval.length);
+      }
+      return retval;
+    } catch (SaslException e) {
+      LOG.error("response: Failed to evaluate client token of length: " +
+          token.length + " : " + e);
+      return null;
+    }
+  }
+
+  /**
+   * Encode a byte[] identifier as a Base64-encoded string.
+   *
+   * @param identifier identifier to encode
+   * @return Base64-encoded string
+   */
+  static String encodeIdentifier(byte[] identifier) {
+    return new String(Base64.encodeBase64(identifier));
+  }
+
+  /**
+   * Encode a password as a base64-encoded char[] array.
+   * @param password as a byte array.
+   * @return password as a char array.
+   */
+  static char[] encodePassword(byte[] password) {
+    return new String(Base64.encodeBase64(password)).toCharArray();
+  }
+
+  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+  @InterfaceStability.Evolving
+  public static class SaslDigestCallbackHandler implements CallbackHandler {
+    /** Used to authenticate the clients */
+    private JobTokenSecretManager secretManager;
+
+    /**
+     * Constructor
+     *
+     * @param secretManager used to authenticate clients
+     */
+    public SaslDigestCallbackHandler(
+        JobTokenSecretManager secretManager) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler " +
+            "with secret manager: " + secretManager);
+      }
+      this.secretManager = secretManager;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "handle: Unrecognized SASL DIGEST-MD5 Callback");
+        }
+      }
+      if (pc != null) {
+        JobTokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
+            secretManager);
+        char[] password =
+          encodePassword(secretManager.retrievePassword(tokenIdentifier));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+              "password for client: " + tokenIdentifier.getUser());
+        }
+        pc.setPassword(password);
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled()) {
+            String username =
+              getIdentifier(authzid, secretManager).getUser().getUserName();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+                  "canonicalized client ID: " + username);
+            }
+          }
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+/**
+ * Authorize or deny client requests based on existence and completeness
+ * of client's SASL authentication.
+ */
+public class AuthorizeServerHandler extends
+    SimpleChannelUpstreamHandler {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(AuthorizeServerHandler.class);
+
+  /**
+   * Constructor.
+   */
+  public AuthorizeServerHandler() {
+  }
+
+  @Override
+  public void messageReceived(
+      ChannelHandlerContext ctx, MessageEvent e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+    }
+    // Authorize: client is allowed to doRequest() if and only if the client
+    // has successfully authenticated with this server.
+    SaslNettyServer saslNettyServer =
+        NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+    if (saslNettyServer == null) {
+      LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+          "this action since there's no saslNettyServer to " +
+          "authenticate the client: " +
+          "refusing to perform requested action: " + e.getMessage());
+      return;
+    }
+
+    if (!saslNettyServer.isComplete()) {
+      LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+          "this action because SASL authentication did not complete: " +
+          "refusing to perform requested action: " + e.getMessage());
+      // Return now *WITHOUT* sending upstream here, since client
+      // not authorized.
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("messageReceived: authenticated client: " +
+          saslNettyServer.getUserName() + " is authorized to do request " +
+          "on server.");
+    }
+    // We call sendUpstream() since the client is allowed to perform this
+    // request. The client's request will now proceed to the next
+    // pipeline component, namely, RequestServerHandler.
+    ctx.sendUpstream(e);
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Oct 10 18:35:31 2012
@@ -81,7 +81,7 @@ public class RequestEncoder extends OneT
     if (LOG.isDebugEnabled()) {
       LOG.debug("encode: Client " + writableRequest.getClientId() + ", " +
           "requestId " + writableRequest.getRequestId() +
-          ", size = " + encodedBuffer.writerIndex() +
+          ", size = " + encodedBuffer.writerIndex() + ", " +
           writableRequest.getType() + " took " +
           SystemTime.getInstance().getNanosecondsSince(
               startEncodingNanoseconds) + " ns");

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+/**
+ * How a server should respond to a client. Currently only used for
+ * responding to client's SASL messages, and removed after client
+ * authenticates.
+ */
+public class ResponseEncoder extends OneToOneEncoder {
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
+  /** Holds the place of the message length until known. */
+  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+
+  @Override
+  protected Object encode(ChannelHandlerContext ctx,
+      Channel channel, Object msg) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("encode(" + ctx + "," + channel + "," + msg);
+    }
+
+    if (!(msg instanceof WritableRequest)) {
+      throw new IllegalArgumentException(
+          "encode: cannot encode message of type " + msg.getClass() +
+              " since it is not an instance of an implementation of " +
+              " WritableRequest.");
+    }
+    @SuppressWarnings("unchecked")
+    WritableRequest writableRequest =
+      (WritableRequest) msg;
+    ChannelBufferOutputStream outputStream =
+      new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
+        10, ctx.getChannel().getConfig().getBufferFactory()));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("encode: Encoding a message of type " + msg.getClass());
+    }
+    outputStream.write(LENGTH_PLACEHOLDER);
+
+    // write type of object.
+    outputStream.writeByte(writableRequest.getType().ordinal());
+
+    // write the object itself.
+    writableRequest.write(outputStream);
+
+    outputStream.flush();
+
+    // Set the correct size at the end.
+    ChannelBuffer encodedBuffer = outputStream.buffer();
+    encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("encode: Encoding a message of type " + msg.getClass());
+    }
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+    if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
+      // We are sending to the client a SASL_COMPLETE response (created by
+      // the SaslServer handler). The SaslServer handler has removed itself
+      // from the pipeline after creating this response, and now it's time for
+      // the ResponseEncoder to remove itself also.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
+            " since client: " + ctx.getChannel().getRemoteAddress() + " has " +
+            "completed authenticating.");
+      }
+      ctx.getPipeline().remove(this);
+    }
+/*end[HADOOP_NON_SECURE]*/
+    return encodedBuffer;
+  }
+}
+

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.SaslNettyClient;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslCompleteRequest;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+
+import java.io.IOException;
+
+/**
+ * Client-side Netty pipeline component that allows authentication with a
+ * server.
+ */
+public class SaslClientHandler extends OneToOneDecoder {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(SaslClientHandler.class);
+  /** Configuration */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   */
+  public SaslClientHandler(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void handleUpstream(
+    ChannelHandlerContext ctx, ChannelEvent evt)
+    throws Exception {
+    if (!(evt instanceof MessageEvent)) {
+      ctx.sendUpstream(evt);
+      return;
+    }
+    MessageEvent e = (MessageEvent) evt;
+    Object originalMessage = e.getMessage();
+    Object decodedMessage = decode(ctx, ctx.getChannel(), originalMessage);
+    // Generate SASL response to server using Channel-local SASL client.
+    SaslNettyClient saslNettyClient = NettyClient.SASL.get(ctx.getChannel());
+    if (saslNettyClient == null) {
+      throw new Exception("handleUpstream: saslNettyClient was unexpectedly " +
+          "null for channel: " + ctx.getChannel());
+    }
+    if (decodedMessage.getClass() == SaslCompleteRequest.class) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("handleUpstream: Server has sent us the SaslComplete " +
+            "message. Allowing normal work to proceed.");
+      }
+      synchronized (saslNettyClient.getAuthenticated()) {
+        saslNettyClient.getAuthenticated().notify();
+      }
+      if (!saslNettyClient.isComplete()) {
+        LOG.error("handleUpstream: Server returned a Sasl-complete message, " +
+            "but as far as we can tell, we are not authenticated yet.");
+        throw new Exception("handleUpstream: Server returned a " +
+            "Sasl-complete message, but as far as " +
+            "we can tell, we are not authenticated yet.");
+      }
+      // Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder
+      // from client pipeline.
+      ctx.getPipeline().remove(this);
+      ctx.getPipeline().replace("length-field-based-frame-decoder",
+          "fixed-length-frame-decoder",
+          new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+      return;
+    }
+    SaslTokenMessageRequest serverToken =
+      (SaslTokenMessageRequest) decodedMessage;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("handleUpstream: Responding to server's token of length: " +
+          serverToken.getSaslToken().length);
+    }
+    // Generate SASL response (but we only actually send the response if it's
+    // non-null.
+    byte[] responseToServer = saslNettyClient.saslResponse(serverToken);
+    if (responseToServer == null) {
+      // If we generate a null response, then authentication has completed (if
+      // not, warn), and return without sending a response back to the server.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("handleUpstream: Response to server is null: " +
+            "authentication should now be complete.");
+      }
+      if (!saslNettyClient.isComplete()) {
+        LOG.warn("handleUpstream: Generated a null response, " +
+            "but authentication is not complete.");
+      }
+      return;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("handleUpstream: Response to server token has length:" +
+            responseToServer.length);
+      }
+    }
+    // Construct a message containing the SASL response and send it to the
+    // server.
+    SaslTokenMessageRequest saslResponse =
+      new SaslTokenMessageRequest(responseToServer);
+    ctx.getChannel().write(saslResponse);
+  }
+
+  @Override
+  protected Object decode(ChannelHandlerContext ctx,
+                          Channel channel, Object msg) throws Exception {
+    if (!(msg instanceof ChannelBuffer)) {
+      throw new IllegalStateException("decode: Got illegal message " + msg);
+    }
+    // Decode msg into an object whose class C implements WritableRequest:
+    //  C will be either SaslTokenMessage or SaslComplete.
+    //
+    // 1. Convert message to a stream that can be decoded.
+    ChannelBuffer buffer = (ChannelBuffer) msg;
+    ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+    // 2. Get first byte: message type:
+    int enumValue = inputStream.readByte();
+    RequestType type = RequestType.values()[enumValue];
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("decode: Got a response of type " + type + " from server:" +
+        channel.getRemoteAddress());
+    }
+    // 3. Create object of the type determined in step 2.
+    Class<? extends WritableRequest> writableRequestClass =
+      type.getRequestClass();
+    WritableRequest serverResponse =
+      ReflectionUtils.newInstance(writableRequestClass, conf);
+    // 4. Deserialize the inputStream's contents into the newly-constructed
+    // serverResponse object.
+    try {
+      serverResponse.readFields(inputStream);
+    } catch (IOException e) {
+      LOG.error("decode: Exception when trying to read server response: " + e);
+    }
+    // serverResponse can now be used in the next stage in pipeline.
+    return serverResponse;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslCompleteRequest;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Generate SASL response tokens to client SASL tokens, allowing clients to
+ * authenticate themselves with this server.
+ */
+public class SaslServerHandler extends
+    SimpleChannelUpstreamHandler {
+    /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SaslServerHandler.class);
+
+  // TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")
+  // or similar.
+  /** Already closed first request? */
+  private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+
+  /** Close connection on first request (used for simulating failure) */
+  private final boolean closeFirstRequest;
+  /** Used to store Hadoop Job Tokens to authenticate clients. */
+  private JobTokenSecretManager secretManager;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public SaslServerHandler(
+      Configuration conf) throws IOException {
+    SaslNettyServer.init(conf);
+    setupSecretManager(conf);
+    closeFirstRequest = conf.getBoolean(
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+        GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+  }
+
+  @Override
+  public void messageReceived(
+      ChannelHandlerContext ctx, MessageEvent e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+    }
+
+    WritableRequest writableRequest = (WritableRequest) e.getMessage();
+    // Simulate a closed connection on the first request (if desired)
+    // TODO: Move out into a separate, dedicated handler.
+    if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
+      LOG.info("messageReceived: Simulating closing channel on first " +
+          "request " + writableRequest.getRequestId() + " from " +
+          writableRequest.getClientId());
+      ALREADY_CLOSED_FIRST_REQUEST = true;
+      ctx.getChannel().close();
+      return;
+    }
+
+    if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
+      // initialize server-side SASL functionality, if we haven't yet
+      // (in which case we are looking at the first SASL message from the
+      // client).
+      SaslNettyServer saslNettyServer =
+          NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+      if (saslNettyServer == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No saslNettyServer for " + ctx.getChannel() +
+              " yet; creating now, with secret manager: " + secretManager);
+        }
+        saslNettyServer = new SaslNettyServer(secretManager);
+        NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(),
+            saslNettyServer);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found existing saslNettyServer on server:" +
+              ctx.getChannel().getLocalAddress() + " for client " +
+              ctx.getChannel().getRemoteAddress());
+        }
+      }
+
+      ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
+      // Send response to client.
+      ctx.getChannel().write(writableRequest);
+      if (saslNettyServer.isComplete()) {
+        // If authentication of client is complete, we will also send a
+        // SASL-Complete message to the client.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("SASL authentication is complete for client with " +
+              "username: " + saslNettyServer.getUserName());
+        }
+        SaslCompleteRequest saslComplete = new SaslCompleteRequest();
+        ctx.getChannel().write(saslComplete);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
+              "authentication is complete.");
+        }
+        ctx.getPipeline().remove(this);
+      }
+      // do not send upstream to other handlers: no further action needs to be
+      // done for SASL_TOKEN_MESSAGE_REQUEST requests.
+      return;
+    } else {
+      // Client should not be sending other-than-SASL messages before
+      // SaslServerHandler has removed itself from the pipeline. Such non-SASL
+      // requests will be denied by the Authorize channel handler (the next
+      // handler upstream in the server pipeline) if SASL authentication has
+      // not completed.
+      LOG.warn("Sending upstream an unexpected non-SASL message :  " +
+          writableRequest);
+      ctx.sendUpstream(e);
+    }
+  }
+
+  /**
+   * Load Hadoop Job Token into secret manager.
+   *
+   * @param conf Configuration
+   * @throws IOException
+   */
+  private void setupSecretManager(Configuration conf) throws IOException {
+    secretManager = new JobTokenSecretManager();
+    String localJobTokenFile = System.getenv().get(
+        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (localJobTokenFile == null) {
+      throw new IOException("Could not find job credentials: environment " +
+          "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
+          " was not defined.");
+    }
+    JobConf jobConf = new JobConf(conf);
+
+    // Find the JobTokenIdentifiers among all the tokens available in the
+    // jobTokenFile and store them in the secretManager.
+    Credentials credentials =
+        TokenCache.loadTokens(localJobTokenFile, jobConf);
+    Collection<Token<? extends TokenIdentifier>> collection =
+        credentials.getAllTokens();
+    for (Token<? extends TokenIdentifier> token:  collection) {
+      TokenIdentifier tokenIdentifier = decodeIdentifier(token,
+          JobTokenIdentifier.class);
+      if (tokenIdentifier instanceof JobTokenIdentifier) {
+        Token<JobTokenIdentifier> theToken =
+            (Token<JobTokenIdentifier>) token;
+        JobTokenIdentifier jobTokenIdentifier =
+            (JobTokenIdentifier) tokenIdentifier;
+        secretManager.addTokenForJob(
+            jobTokenIdentifier.getJobId().toString(), theToken);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("loaded JobToken credentials: " + credentials + " from " +
+          "localJobTokenFile: " + localJobTokenFile);
+    }
+  }
+
+  /**
+   * Get the token identifier object, or null if it could not be constructed
+   * (because the class could not be loaded, for example).
+   * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.
+   * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not
+   * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.
+   *
+   * @param token the token to decode into a TokenIdentifier
+   * @param cls the subclass of TokenIdentifier to decode the token into.
+   * @return the token identifier.
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  private TokenIdentifier decodeIdentifier(
+      Token<? extends TokenIdentifier> token,
+      Class<? extends TokenIdentifier> cls) throws IOException {
+    TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    tokenIdentifier.readFields(in);
+    in.close();
+    return tokenIdentifier;
+  }
+
+  /** Factory for {@link SaslServerHandler} */
+  public static class Factory {
+    /**
+     * Constructor
+     */
+    public Factory() {
+    }
+    /**
+     * Create new {@link SaslServerHandler}
+     *
+     * @param conf Configuration to use
+     * @return New {@link SaslServerHandler}
+     */
+    public SaslServerHandler newHandler(
+        Configuration conf) throws IOException {
+      return new SaslServerHandler(conf);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java Wed Oct 10 18:35:31 2012
@@ -22,6 +22,16 @@ package org.apache.giraph.comm.requests;
  * Type of the request
  */
 public enum RequestType {
+  /*if[HADOOP_NON_SECURE]
+  else[HADOOP_NON_SECURE]*/
+  /** Exchange authentication information between clients and servers */
+  SASL_TOKEN_MESSAGE_REQUEST(SaslTokenMessageRequest.class),
+  /**
+   * Used by servers to acknowledge SASL authentication completion with
+   * client, so client can modify its pipeline afterwards.
+   */
+  SASL_COMPLETE_REQUEST(SaslCompleteRequest.class),
+  /*end[HADOOP_NON_SECURE]*/
   /** Sending vertices request */
   SEND_VERTEX_REQUEST(SendVertexRequest.class),
   /** Sending a partition of messages for next superstep */

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Reply from server to client after SASL authentication completes.
+ */
+public class SaslCompleteRequest extends WritableRequest {
+  /**
+   * Constructor used for reflection and sending.
+   */
+  public SaslCompleteRequest() {
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SASL_COMPLETE_REQUEST;
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+  }
+}